This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 858d0ae6da67e5ba1d418df4043af91f28611190 Author: Guoliang Sun <guoliang....@kyligence.io> AuthorDate: Tue Jan 3 13:39:27 2023 +0800 KYLIN-5459 Partial Log Governance --- build/conf/kylin-server-log4j.xml | 48 +++++++-- .../kylin/rest/aspect/SchedulerEnhancer.java | 2 +- .../apache/kylin/rest/broadcaster/Broadcaster.java | 8 +- .../apache/kylin/rest/service/AuditLogService.java | 10 +- .../service/task/QueryHistoryTaskScheduler.java | 8 +- .../apache/kylin/common/constant/LogConstant.java} | 18 ++-- .../transaction/AbstractAuditLogReplayWorker.java | 10 +- .../transaction/AuditLogReplayWorker.java | 18 +++- .../common/persistence/transaction/UnitOfWork.java | 52 ++++++---- .../kylin/common/scheduler/EventBusFactory.java | 16 ++- .../kylin/job/execution/DefaultExecutable.java | 2 + .../cube/storage/TotalStorageCollector.java | 2 +- .../apache/kylin/metadata/epoch/EpochManager.java | 108 ++++++++++++++------- .../metadata/recommendation/ref/OptRecV2.java | 4 +- .../kylin/metadata/epoch/EpochManagerTest.java | 22 +++++ .../common/metrics/MetricsInfluxdbReporter.java | 4 +- .../rest/scheduler/AutoRefreshSnapshotRunner.java | 8 +- .../org/apache/kylin/rest/service/JobService.java | 30 ++++-- .../apache/kylin/rest/service/JobServiceTest.java | 12 ++- .../kylin/rest/broadcaster/BroadcasterTest.java | 35 +++++++ .../kylin/rest/service/QueryCacheManager.java | 8 +- .../apache/kylin/rest/service/QueryService.java | 8 +- .../spark/source/NSparkMetadataExplorer.java | 2 +- .../engine/spark/builder/SegmentFlatTable.scala | 22 +++-- .../engine/spark/job/RDSegmentBuildExec.scala | 5 +- .../kylin/engine/spark/job/SegmentBuildJob.java | 2 + .../kylin/engine/spark/job/exec/BuildExec.scala | 8 +- .../kylin/engine/spark/job/exec/MergeExec.scala | 2 + .../kylin/engine/spark/job/exec/SnapshotExec.scala | 2 + .../engine/spark/job/exec/TableAnalyzerExec.scala | 2 + .../kylin/engine/spark/job/stage/StageExec.scala | 2 + .../engine/spark/job/stage/WaiteForResource.scala | 2 + .../engine/spark/job/stage/build/BuildDict.scala | 2 + .../engine/spark/job/stage/build/BuildLayer.scala | 2 + .../spark/job/stage/build/CostBasedPlanner.scala | 2 + .../job/stage/build/FlatTableAndDictBase.scala | 5 + .../job/stage/build/GatherFlatTableStats.scala | 2 + .../spark/job/stage/build/GenerateFlatTable.scala | 2 + .../stage/build/MaterializedFactTableView.scala | 2 + .../spark/job/stage/build/RefreshColumnBytes.scala | 2 + .../spark/job/stage/build/RefreshSnapshots.scala | 2 + .../stage/build/partition/PartitionBuildDict.scala | 2 + .../build/partition/PartitionBuildLayer.scala | 2 + .../partition/PartitionCostBasedPlanner.scala | 2 + .../partition/PartitionGatherFlatTableStats.scala | 1 + .../partition/PartitionGenerateFlatTable.scala | 2 + .../PartitionMaterializedFactTableView.scala | 2 + .../partition/PartitionRefreshColumnBytes.scala | 2 + .../spark/job/stage/merge/MergeColumnBytes.scala | 2 + .../spark/job/stage/merge/MergeFlatTable.scala | 2 + .../spark/job/stage/merge/MergeIndices.scala | 2 + .../partition/PartitionMergeColumnBytes.scala | 2 + .../merge/partition/PartitionMergeFlatTable.scala | 2 + .../merge/partition/PartitionMergeIndices.scala | 2 + .../spark/job/stage/snapshots/SnapshotsBuild.scala | 2 + .../job/stage/tablesampling/AnalyzerTable.scala | 2 + .../spark/job/stage/WaiteForResourceTest.scala} | 20 ++-- .../job/stage/build/RefreshColumnBytesTest.scala} | 37 +++---- .../job/stage/build/RefreshSnapshotsTest.scala} | 18 ++-- .../PartitionRefreshColumnBytesTest.scala} | 30 +++--- .../job/stage/merge/MergeColumnBytesTest.scala} | 18 +++- .../spark/job/stage/merge/MergeStageTest.scala | 2 + .../partition/PartitionMergeColumnBytesTest.scala} | 18 +++- .../org/apache/spark/utils/TestResourceUtils.scala | 6 ++ .../common/asyncprofiler/AsyncProfilerUtils.java | 2 +- .../org/apache/kylin/common/CustomUtils.scala} | 19 ++-- .../java/org/apache/kylin/tool/KylinLogTool.java | 2 +- 67 files changed, 499 insertions(+), 205 deletions(-) diff --git a/build/conf/kylin-server-log4j.xml b/build/conf/kylin-server-log4j.xml index 701e8071fc..93e0b51299 100644 --- a/build/conf/kylin-server-log4j.xml +++ b/build/conf/kylin-server-log4j.xml @@ -26,7 +26,7 @@ <PatternLayout pattern="%d{ISO8601} %-5p %X{request.project}[%t] %c{2} : %mask{%m}%n"/> </RollingRandomAccessFile> <Routing name="routing"> - <Routes pattern="${ctx:logCategory}"> + <Routes pattern="$${ctx:logCategory}"> <Route> <RollingFile name="rolling-${ctx:logCategory}" fileName="${env:KYLIN_HOME}/logs/kylin.${ctx:logCategory}.log" @@ -39,7 +39,7 @@ </RollingFile> </Route> - <Route ref="server" key="${ctx:logCategory}"/> + <Route ref="server" key="$${ctx:logCategory}"/> </Routes> </Routing> <RollingFile name="query-log-spark" fileName="${env:KYLIN_HOME}/logs/kylin.query.log" append="true" @@ -50,20 +50,48 @@ <DefaultRolloverStrategy max="10"/> <PatternLayout pattern="%d{ISO8601} %-5p %X{request.project}[%t] %c{2} : %mask{%m}%n"/> </RollingFile> + <RollingFile name="spark-history-server" fileName="${env:KYLIN_HOME}/logs/kylin.history_server.log" + append="true" + filePattern="${env:KYLIN_HOME}/logs/kylin.history_server.log.%i"> + <Policies> + <SizeBasedTriggeringPolicy size="268435456"/> + </Policies> + <DefaultRolloverStrategy max="10"/> + <PatternLayout pattern="%d{ISO8601} %-5p %X{request.project}[%t] %c{2} : %mask{%m}%n"/> + </RollingFile> + <RollingFile name="build-log-spark" fileName="${env:KYLIN_HOME}/logs/kylin.build.log" append="true" + filePattern="${env:KYLIN_HOME}/logs/kylin.build.log.%i"> + <Policies> + <SizeBasedTriggeringPolicy size="268435456"/> + </Policies> + <DefaultRolloverStrategy max="10"/> + <PatternLayout pattern="%d{ISO8601} %-5p %X{request.project}[%t] %c{2} : %mask{%m}%n"/> + </RollingFile> + <RollingFile name="metadata-log-spark" fileName="${env:KYLIN_HOME}/logs/kylin.metadata.log" append="true" + filePattern="${env:KYLIN_HOME}/logs/kylin.metadata.log.%i"> + <Policies> + <SizeBasedTriggeringPolicy size="268435456"/> + </Policies> + <DefaultRolloverStrategy max="10"/> + <PatternLayout pattern="%d{ISO8601} %-5p %X{request.project}[%t] %c{2} : %mask{%m}%n"/> + </RollingFile> </Appenders> <Loggers> <Root level="INFO"> <AppenderRef ref="routing"/> </Root> - <Logger name="org.apache.spark.scheduler.TaskSetManager" level="INFO" additivity="false"> + <Logger name="org.apache.spark.scheduler.TaskSetManager" level="WARN" additivity="false"> <AppenderRef ref="query-log-spark"/> </Logger> - <Logger name="org.apache.spark.scheduler.DAGScheduler" level="INFO" additivity="false"> + <Logger name="org.apache.spark.scheduler.DAGScheduler" level="WARN" additivity="false"> <AppenderRef ref="query-log-spark"/> </Logger> - <Logger name="org.apache.spark.scheduler.YarnScheduler" level="INFO" additivity="false"> + <Logger name="org.apache.spark.scheduler.YarnScheduler" level="WARN" additivity="false"> <AppenderRef ref="query-log-spark"/> </Logger> + <Logger name="org.apache.spark.deploy.history" level="INFO" additivity="false"> + <AppenderRef ref="spark-history-server"/> + </Logger> <Logger name="io.kyligence" level="DEBUG"/> <Logger name="org.springframework" level="WARN"/> <Logger name="org.apache.kylin" level="DEBUG"/> @@ -74,12 +102,20 @@ <Logger name="org.apache.kylin.ext" level="INFO"/> <!-- Query log --> <Logger name="org.apache.kylin.query" level="INFO"/> - <Logger name="org.apache.kylin.query" level="INFO"/> <Logger name="NDataflowCapabilityChecker" level="INFO" /> <Logger name="org.apache.kylin.common.util.CheckUtil" level="INFO" /> <Logger name="NQueryLayoutChooser" level="INFO" /> <Logger name="org.apache.kylin.query.runtime.plan.ResultPlan" level="INFO" /> <Logger name="org.apache.spark.sql.kylin.external.LogEx" level="INFO" /> <Logger name="org.apache.kylin.engine.spark.utils.LogEx" level="INFO" /> + <Logger name="org.apache.kylin.rest.service.QueryCacheManager" level="INFO"/> + <!-- Kerberos log --> + <Logger name="io.kyligence.kap.tool.kerberos" level="INFO"/> + <!-- Other log --> + <Logger name="org.apache.kylin.metadata.cube.storage.TotalStorageCollector" level="INFO" /> + <Logger name="org.apache.kylin.common.metrics.MetricsInfluxdbReporter" level="INFO" /> + <Logger name="io.kyligence.kap.metadata.recommendation.ref.OptRecV2" level="INFO" /> + <Logger name="org.apache.kylin.rest.security.LdapAuthenticationProvider" level="INFO" /> + <Logger name="org.apache.kylin.rest.aspect.SchedulerEnhancer" level="INFO" /> </Loggers> </Configuration> diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/aspect/SchedulerEnhancer.java b/src/common-service/src/main/java/org/apache/kylin/rest/aspect/SchedulerEnhancer.java index b086f379d2..d39066c42c 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/aspect/SchedulerEnhancer.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/aspect/SchedulerEnhancer.java @@ -35,7 +35,7 @@ public class SchedulerEnhancer { public void aroundScheduled(ProceedingJoinPoint pjp) throws Throwable { val config = KylinConfig.getInstanceFromEnv(); if (!"query".equals(config.getServerMode())) { - log.info("schedule at job leader"); + log.debug("schedule at job leader"); pjp.proceed(); } } diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/broadcaster/Broadcaster.java b/src/common-service/src/main/java/org/apache/kylin/rest/broadcaster/Broadcaster.java index c7b36369c1..92b9686557 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/broadcaster/Broadcaster.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/broadcaster/Broadcaster.java @@ -39,6 +39,8 @@ import java.util.stream.Stream; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.ArrayUtils; +import org.apache.kylin.common.constant.LogConstant; +import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.persistence.transaction.BroadcastEventReadyNotifier; import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.common.util.DaemonThreadFactory; @@ -91,7 +93,9 @@ public class Broadcaster implements Closeable { public void announce(BroadcastEventReadyNotifier event) { if (eventQueue.contains(event)) { - logger.debug("broadcast event queue has contain this event: {}", event); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { + logger.debug("broadcast event queue has contain this event: {}", event); + } return; } if (!eventQueue.offer(event)) { @@ -100,7 +104,7 @@ public class Broadcaster implements Closeable { } public void consumeEvent() { - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { while (isRunning) { BroadcastEventReadyNotifier notifier = eventQueue.take(); handleEvent(notifier); diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/AuditLogService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/AuditLogService.java index 1d79c2734f..1f60bfdd83 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/AuditLogService.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/AuditLogService.java @@ -19,6 +19,8 @@ package org.apache.kylin.rest.service; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.constant.LogConstant; +import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.persistence.ResourceStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,8 +33,10 @@ public class AuditLogService { public void notifyCatchUp() { ResourceStore store = ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()); - logger.info("Start to catchup manually"); - store.getAuditLogStore().catchup(); - logger.info("End to catchup manually"); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.info("Start to catchup manually"); + store.getAuditLogStore().catchup(); + logger.info("End to catchup manually"); + } } } diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java index eae1ed351b..07ceafe06c 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java @@ -33,6 +33,8 @@ import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.constant.LogConstant; +import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.util.ExecutorServiceUtil; import org.apache.kylin.common.util.NamedThreadFactory; import org.apache.kylin.common.util.Pair; @@ -98,7 +100,9 @@ public class QueryHistoryTaskScheduler { if (querySmartSupporter == null && SpringContext.getApplicationContext() != null) { querySmartSupporter = SpringContext.getBean(QuerySmartSupporter.class); } - log.debug("New QueryHistoryAccelerateScheduler created by project {}", project); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { + log.debug("New QueryHistoryAccelerateScheduler created by project {}", project); + } } public static QueryHistoryTaskScheduler getInstance(String project) { @@ -484,7 +488,7 @@ public class QueryHistoryTaskScheduler { @Override public void run() { - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { work(); } catch (Exception e) { log.warn("QueryHistory {} process failed of project({})", name(), project, e); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala b/src/core-common/src/main/java/org/apache/kylin/common/constant/LogConstant.java similarity index 69% copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala copy to src/core-common/src/main/java/org/apache/kylin/common/constant/LogConstant.java index 3091e92e0a..4a5293dffa 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala +++ b/src/core-common/src/main/java/org/apache/kylin/common/constant/LogConstant.java @@ -16,17 +16,15 @@ * limitations under the License. */ -package org.apache.kylin.engine.spark.job.stage.merge +package org.apache.kylin.common.constant; -import org.apache.kylin.engine.spark.job.SegmentJob -import org.apache.kylin.metadata.cube.model.NDataSegment +public class LogConstant { -class MergeColumnBytes(jobContext: SegmentJob, dataSegment: NDataSegment) - extends MergeStage(jobContext, dataSegment) { + private LogConstant() { + } - override def execute(): Unit = { - mergeColumnBytes() - - cleanup() - } + public static final String SCHEDULE_CATEGORY = "schedule"; + public static final String METADATA_CATEGORY = "metadata"; + public static final String QUERY_CATEGORY = "query"; + public static final String BUILD_CATEGORY = "build"; } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.java index afa66f885c..9bc177891a 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AbstractAuditLogReplayWorker.java @@ -30,7 +30,9 @@ import java.util.function.Predicate; import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.persistence.AuditLog; import org.apache.kylin.common.persistence.UnitMessages; import org.apache.kylin.common.persistence.event.Event; @@ -116,9 +118,11 @@ public abstract class AbstractAuditLogReplayWorker { } } - for (UnitMessages message : messagesMap.values()) { - log.debug("replay {} event for project:{}", message.getMessages().size(), message.getKey()); - replayer.replay(message); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + for (UnitMessages message : messagesMap.values()) { + log.debug("replay {} event for project:{}", message.getMessages().size(), message.getKey()); + replayer.replay(message); + } } } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AuditLogReplayWorker.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AuditLogReplayWorker.java index 2ecb5041e1..63299c0aa4 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AuditLogReplayWorker.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/AuditLogReplayWorker.java @@ -33,6 +33,8 @@ import java.util.stream.LongStream; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.constant.LogConstant; +import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.VersionConflictException; import org.apache.kylin.common.persistence.AuditLog; @@ -103,7 +105,7 @@ public class AuditLogReplayWorker extends AbstractAuditLogReplayWorker { log.info("Catchup Already stopped"); return; } - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { catchupToMaxId(logOffset); } catch (TransactionException | DatabaseNotAvailableException e) { log.warn("cannot create transaction or auditlog database connect error, ignore it", e); @@ -148,7 +150,9 @@ public class AuditLogReplayWorker extends AbstractAuditLogReplayWorker { } if (CollectionUtils.isEmpty(needReplayedIdList)) { - log.debug("needReplayedIdList is empty"); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + log.debug("needReplayedIdList is empty"); + } return Lists.newArrayList(); } @@ -160,7 +164,7 @@ public class AuditLogReplayWorker extends AbstractAuditLogReplayWorker { return; } - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { val fetchAuditLog = auditLogStore.fetch(needReplayedIdList); if (CollectionUtils.isEmpty(fetchAuditLog)) { return; @@ -200,7 +204,9 @@ public class AuditLogReplayWorker extends AbstractAuditLogReplayWorker { return -1L; } - log.debug("start restore from {}", currentWindow); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + log.debug("start restore from {}", currentWindow); + } val stepWin = new SlideWindow(currentWindow); while (stepWin.forwardRightStep(STEP)) { @@ -211,7 +217,9 @@ public class AuditLogReplayWorker extends AbstractAuditLogReplayWorker { } stepWin.syncRightStep(); } - log.debug("end restore from {}, delay queue:{}", currentWindow, delayIdQueue.size()); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + log.debug("end restore from {}, delay queue:{}", currentWindow, delayIdQueue.size()); + } return currentWindow.getEnd(); }); diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java index 4e576d4e4f..b256db1fdd 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java @@ -23,8 +23,10 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.exception.code.ErrorCodeSystem; +import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.persistence.InMemResourceStore; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; @@ -111,11 +113,13 @@ public class UnitOfWork { try { T ret; - if (retry != 1) { - log.debug("UnitOfWork {} in project {} is retrying for {}th time", traceId, params.getUnitName(), - retry); - } else { - log.debug("UnitOfWork {} started on project {}", traceId, params.getUnitName()); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + if (retry != 1) { + log.debug("UnitOfWork {} in project {} is retrying for {}th time", traceId, params.getUnitName(), + retry); + } else { + log.debug("UnitOfWork {} started on project {}", traceId, params.getUnitName()); + } } long startTime = System.currentTimeMillis(); @@ -124,7 +128,9 @@ public class UnitOfWork { long startTransactionTime = System.currentTimeMillis(); val waitForLockTime = startTransactionTime - startTime; if (waitForLockTime > 3000) { - log.warn("UnitOfWork {} takes too long time {}ms to start", traceId, waitForLockTime); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + log.warn("UnitOfWork {} takes too long time {}ms to start", traceId, waitForLockTime); + } } ret = params.getProcessor().process(); @@ -137,7 +143,7 @@ public class UnitOfWork { handleError(throwable, params, retry, traceId); } finally { if (isAlreadyInTransaction()) { - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { val unitOfWork = UnitOfWork.get(); unitOfWork.getCurrentLock().unlock(); unitOfWork.cleanResource(); @@ -161,13 +167,15 @@ public class UnitOfWork { } private static void logIfLongTransaction(long duration, String traceId) { - if (duration > 3000) { - log.warn("UnitOfWork {} takes too long time {}ms to complete", traceId, duration); - if (duration > 10000) { - log.warn("current stack: ", new Throwable()); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + if (duration > 3000) { + log.warn("UnitOfWork {} takes too long time {}ms to complete", traceId, duration); + if (duration > 10000) { + log.warn("current stack: ", new Throwable()); + } + } else { + log.debug("UnitOfWork {} takes {}ms to complete", traceId, duration); } - } else { - log.debug("UnitOfWork {} takes {}ms to complete", traceId, duration); } } @@ -178,7 +186,9 @@ public class UnitOfWork { val lock = params.getTempLockName() == null ? TransactionLock.getLock(project, readonly) : TransactionLock.getLock(params.getTempLockName(), readonly); - log.trace("get lock for project {}, lock is held by current thread: {}", project, lock.isHeldByCurrentThread()); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + log.trace("get lock for project {}, lock is held by current thread: {}", project, lock.isHeldByCurrentThread()); + } //re-entry is not encouraged (because it indicates complex handling logic, bad smell), let's abandon it first Preconditions.checkState(!lock.isHeldByCurrentThread()); lock.lock(); @@ -202,7 +212,9 @@ public class UnitOfWork { ResourceStore.setRS(configCopy, rs); unitOfWork.setLocalConfig(KylinConfig.setAndUnsetThreadLocalConfig(configCopy)); - log.trace("sandbox RS {} now takes place for main RS {}", rs, underlying); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + log.trace("sandbox RS {} now takes place for main RS {}", rs, underlying); + } return unitOfWork; } @@ -248,14 +260,16 @@ public class UnitOfWork { val unitMessages = packageEvents(eventList, get().getProject(), traceId, writeInterceptor); long entitiesSize = unitMessages.getMessages().stream().filter(event -> event instanceof ResourceRelatedEvent) .count(); - log.debug("transaction {} updates {} metadata items", traceId, entitiesSize); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + log.debug("transaction {} updates {} metadata items", traceId, entitiesSize); + } checkEpoch(params); val unitName = params.getUnitName(); metadataStore.batchUpdate(unitMessages, get().getParams().isSkipAuditLog(), unitName, params.getEpochId()); if (entitiesSize != 0 && !params.isReadonly() && !params.isSkipAuditLog() && !config.isUTEnv()) { factory.postAsync(new AuditLogBroadcastEventNotifier()); } - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { // replayInTransaction in leader before release lock val replayer = MessageSynchronization.getInstance(originConfig); replayer.replayInTransaction(unitMessages); @@ -284,7 +298,9 @@ public class UnitOfWork { } if (retry == 1) { - log.warn("transaction failed at first time, traceId:" + traceId, throwable); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + log.warn("transaction failed at first time, traceId:" + traceId, throwable); + } } } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/scheduler/EventBusFactory.java b/src/core-common/src/main/java/org/apache/kylin/common/scheduler/EventBusFactory.java index ca476ed474..92da48642f 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/scheduler/EventBusFactory.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/scheduler/EventBusFactory.java @@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.Singletons; +import org.apache.kylin.common.constant.LogConstant; +import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.util.ExecutorServiceUtil; import org.apache.kylin.common.util.NamedThreadFactory; import org.apache.kylin.common.persistence.transaction.BroadcastEventReadyNotifier; @@ -126,7 +128,9 @@ public class EventBusFactory { } public void postAsync(SchedulerEventNotifier event) { - log.debug("Post event {} async", event); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { + log.debug("Post event {} async", event); + } if (event instanceof BroadcastEventReadyNotifier) { broadcastEventBus.post(event); } else { @@ -135,12 +139,16 @@ public class EventBusFactory { } public void postSync(Object event) { - log.debug("Post event {} sync", event); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { + log.debug("Post event {} sync", event); + } syncEventBus.post(event); } public void callService(Object event) { - log.debug("Post Service event {} sync", event); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { + log.debug("Post Service event {} sync", event); + } serviceEventBus.post(event); } @@ -153,7 +161,7 @@ public class EventBusFactory { private void stopThreadPool(ExecutorService executor) { executor.shutdown(); - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { if (!executor.awaitTermination(6000, TimeUnit.SECONDS)) { ExecutorServiceUtil.forceShutdown(executor); } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java index 3f22a1c502..5a4e92e58c 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java @@ -71,10 +71,12 @@ public class DefaultExecutable extends AbstractExecutable implements ChainedExec List<Executable> executables = getTasks().stream().map(Executable.class::cast).collect(Collectors.toList()); switch (getJobSchedulerMode()) { case DAG: + logger.info("Execute in DAG mode."); dagSchedule(executables, context); break; case CHAIN: default: + logger.info("Execute in CHAIN mode."); chainedSchedule(executables, context); break; } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java index f6b93383ea..74c30ab86a 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java @@ -36,7 +36,7 @@ public class TotalStorageCollector implements StorageInfoCollector { public void doCollect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) throws IOException { long totalStorageSize = hdfsCapacityMetrics.getHdfsCapacityByProject(project); if (totalStorageSize != -1L) { - log.info("Reuse workingDirCapacity by project {}, storageSize: {}", project, totalStorageSize); + log.debug("Reuse workingDirCapacity by project {}, storageSize: {}", project, totalStorageSize); storageVolumeInfo.setTotalStorageSize(totalStorageSize); return; } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java index cbe1ba9590..e8d3acc29b 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java @@ -45,6 +45,8 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.Singletons; +import org.apache.kylin.common.constant.LogConstant; +import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.persistence.metadata.Epoch; import org.apache.kylin.common.persistence.metadata.EpochStore; import org.apache.kylin.common.persistence.transaction.UnitOfWork; @@ -72,7 +74,7 @@ import lombok.Synchronized; import lombok.val; public class EpochManager { - private static final Logger logger = LoggerFactory.getLogger(EpochManager.class); + private static final Logger logger = LoggerFactory.getLogger(LogConstant.METADATA_CATEGORY); public static EpochManager getInstance() { return Singletons.getInstance(EpochManager.class, clz -> { @@ -174,13 +176,17 @@ public class EpochManager { if (CollectionUtils.isNotEmpty(outdatedProjects)) { outdatedProjects.forEach(EpochManager.this::deleteEpoch); notifierEscapedProject(outdatedProjects); - logger.warn("remove outdated epoch list :{}", String.join(",", outdatedProjects)); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.warn("remove outdated epoch list :{}", String.join(",", outdatedProjects)); + } } } @Synchronized("renewLock") public void tryRenewOwnedEpochs() { - logger.debug("Start renew owned epoch........."); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.debug("Start renew owned epoch........."); + } long startTime = System.currentTimeMillis(); //1.check and get project @@ -197,7 +203,9 @@ public class EpochManager { } if (CollectionUtils.isEmpty(oriOwnedEpochSet)) { - logger.info("current node own none project, end renew..."); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.info("current node own none project, end renew..."); + } return; } @@ -208,8 +216,10 @@ public class EpochManager { notifierAfterUpdatedEpoch("renew", lastRenewEpochSet, afterRenewEpochSets); lastRenewEpochSet.clear(); lastRenewEpochSet.addAll(afterRenewEpochSets); - logger.debug("End renew owned epoch,cost:{}.........", - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.debug("End renew owned epoch,cost:{}.........", + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)); + } } private Set<String> innerRenewEpochWithRetry(Set<Epoch> oriEpochs) { @@ -251,7 +261,7 @@ public class EpochManager { totalTask.forEach(taskEpochList -> { val epochTargetList = taskEpochList.stream().map(Epoch::getEpochTarget).collect(Collectors.toList()); renewExecutor.submit(() -> { - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { if (CollectionUtils.isNotEmpty(epochTargetList)) { batchRenewEpoch(taskEpochList); newRenewEpochSets.addAll(epochTargetList); @@ -264,7 +274,7 @@ public class EpochManager { }); }); - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { if (!countDownLatch.await(epochRenewTimeout, TimeUnit.SECONDS)) { logger.error("renew not finished,{}/{}...", newRenewEpochSets.size(), oriEpochs.size()); } @@ -276,7 +286,9 @@ public class EpochManager { @Synchronized("updateLock") public void tryUpdateAllEpochs() { - logger.debug("Start update Epochs........."); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.debug("Start update Epochs........."); + } long startTime = System.currentTimeMillis(); //1.check and get project @@ -294,7 +306,9 @@ public class EpochManager { } if (CollectionUtils.isEmpty(projects)) { - logger.debug("don't have more new project, end update..."); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.debug("don't have more new project, end update..."); + } return; } @@ -303,8 +317,10 @@ public class EpochManager { notifierAfterUpdatedEpoch("update", Collections.emptySet(), updatedMewEpochs); - logger.debug("End update Epochs,cost:{}:.........", - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.debug("End update Epochs,cost:{}:.........", + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)); + } } private Set<String> tryUpdateEpochByProjects(final List<String> projects) { @@ -345,18 +361,24 @@ public class EpochManager { eventBusFactory.postAsync(new ProjectEscapedNotifier(project)); } - logger.warn("notifier escaped project:{}", String.join(",", escapedProjects)); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.warn("notifier escaped project:{}", String.join(",", escapedProjects)); + } } private void notifierAfterUpdatedEpoch(String updateTypeName, Set<String> oriEpochs, Set<String> newEpochs) { - logger.debug("after {} new epoch size:{}, Project {} owned by {}", updateTypeName, newEpochs.size(), + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.debug("after {} new epoch size:{}, Project {} owned by {}", updateTypeName, newEpochs.size(), String.join(",", newEpochs), identity); + } if (CollectionUtils.isNotEmpty(newEpochs)) { Collection<String> newControlledProjects = new HashSet<>(Sets.difference(newEpochs, oriEpochs)); if (CollectionUtils.isNotEmpty(newControlledProjects)) { - logger.debug("after {} controlled projects: {}", updateTypeName, + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.debug("after {} controlled projects: {}", updateTypeName, String.join(",", newControlledProjects)); + } newControlledProjects.forEach(p -> eventBusFactory.postAsync(new ProjectControlledNotifier(p))); } } @@ -364,7 +386,9 @@ public class EpochManager { if (CollectionUtils.isNotEmpty(oriEpochs)) { Collection<String> escapedProjects = new HashSet<>(Sets.difference(oriEpochs, newEpochs)); if (CollectionUtils.isNotEmpty(escapedProjects)) { - logger.debug("after {} escaped projects: {}", updateTypeName, String.join(",", escapedProjects)); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.debug("after {} escaped projects: {}", updateTypeName, String.join(",", escapedProjects)); + } notifierEscapedProject(escapedProjects); } } @@ -531,7 +555,7 @@ public class EpochManager { return false; } return EpochUpdateLockManager.executeEpochWithLock(epochTarget, () -> { - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { Epoch epoch = epochStore.getEpoch(epochTarget); Pair<Epoch, Epoch> oldNewEpochPair = oldEpoch2NewEpoch(epoch, epochTarget, force, null); @@ -672,23 +696,25 @@ public class EpochManager { } private boolean isEpochLegal(Epoch epoch) { - if (epoch == null) { - logger.debug("Get null epoch"); - return false; - } else if (StringUtils.isEmpty(epoch.getCurrentEpochOwner())) { - logger.debug("Epoch {}'s owner is empty", epoch); - return false; - } else if (System.currentTimeMillis() - epoch.getLastEpochRenewTime() > epochExpiredTime * 1000) { - logger.warn("Epoch {}'s last renew time is expired. Current time is {}, expiredTime is {}", epoch, - System.currentTimeMillis(), epochExpiredTime); - return false; - } + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + if (epoch == null) { + logger.debug("Get null epoch"); + return false; + } else if (StringUtils.isEmpty(epoch.getCurrentEpochOwner())) { + logger.debug("Epoch {}'s owner is empty", epoch); + return false; + } else if (System.currentTimeMillis() - epoch.getLastEpochRenewTime() > epochExpiredTime * 1000) { + logger.warn("Epoch {}'s last renew time is expired. Current time is {}, expiredTime is {}", epoch, + System.currentTimeMillis(), epochExpiredTime); + return false; + } - ResourceGroupManager rgManager = ResourceGroupManager.getInstance(config); - String epochServer = getHostAndPort(epoch.getCurrentEpochOwner()); - if (!rgManager.instanceHasPermissionToOwnEpochTarget(epoch.getEpochTarget(), epochServer)) { - logger.debug("Epoch {}'s owner is not in build request type resource group.", epoch); - return false; + ResourceGroupManager rgManager = ResourceGroupManager.getInstance(config); + String epochServer = getHostAndPort(epoch.getCurrentEpochOwner()); + if (!rgManager.instanceHasPermissionToOwnEpochTarget(epoch.getEpochTarget(), epochServer)) { + logger.debug("Epoch {}'s owner is not in build request type resource group.", epoch); + return false; + } } return true; } @@ -713,7 +739,9 @@ public class EpochManager { if (!isGlobalProject(epochTargetTemp)) { val targetProjectInstance = NProjectManager.getInstance(config).getProject(epochTargetTemp); if (Objects.isNull(targetProjectInstance)) { - logger.warn("get epoch failed, because the project:{} dose not exist", epochTargetTemp); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.warn("get epoch failed, because the project:{} dose not exist", epochTargetTemp); + } return null; } @@ -762,7 +790,9 @@ public class EpochManager { public void deleteEpoch(String epochTarget) { EpochUpdateLockManager.executeEpochWithLock(epochTarget, () -> { epochStore.delete(epochTarget); - logger.debug("delete epoch:{}", epochTarget); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.debug("delete epoch:{}", epochTarget); + } return null; }); } @@ -790,7 +820,9 @@ public class EpochManager { private boolean checkInMaintenanceMode() { if (isMaintenanceMode()) { - logger.debug("System is currently undergoing maintenance. Abort updating Epochs"); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.debug("System is currently undergoing maintenance. Abort updating Epochs"); + } return true; } return false; @@ -802,7 +834,9 @@ public class EpochManager { // when shutdown or meta data is inconsistent public void releaseOwnedEpochs() { - logger.info("Release owned epochs"); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { + logger.info("Release owned epochs"); + } epochStore.executeWithTransaction(() -> { val epochs = epochStore.list().stream().filter(this::checkEpochOwnerOnly).collect(Collectors.toList()); epochs.forEach(epoch -> { diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/recommendation/ref/OptRecV2.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/recommendation/ref/OptRecV2.java index 41c9d6100f..44da2716cb 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/recommendation/ref/OptRecV2.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/recommendation/ref/OptRecV2.java @@ -114,7 +114,7 @@ public class OptRecV2 { } public void initRecommendation() { - log.info("Start to initialize recommendation({}/{}}", project, getUuid()); + log.debug("Start to initialize recommendation({}/{}}", project, getUuid()); NDataModel dataModel = getModel(); if (dataModel.isBroken()) { @@ -133,7 +133,7 @@ public class OptRecV2 { } public List<RawRecItem> filterExcludedRecPatterns(List<RawRecItem> rawRecItems) { - log.info("Start to initialize recommendation patterns({}/{}}", project, getUuid()); + log.debug("Start to initialize recommendation patterns({}/{}}", project, getUuid()); NDataModel dataModel = getModel(); if (dataModel.isBroken()) { log.warn("Discard all related recommendations for model({}/{}) is broken.", project, uuid); diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java index d488dfe67b..425d81697d 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java @@ -467,4 +467,26 @@ class EpochManagerTest { } } + @Test + void testUpdateAllEpochsSuccess() { + Epoch e1 = new Epoch(); + e1.setEpochTarget("test1"); + e1.setCurrentEpochOwner("owner1"); + e1.setEpochId(1); + e1.setLastEpochRenewTime(System.currentTimeMillis()); + + Epoch e2 = new Epoch(); + e2.setEpochTarget("test2"); + e2.setCurrentEpochOwner("owner2"); + e2.setEpochId(1); + e2.setLastEpochRenewTime(System.currentTimeMillis()); + + getEpochStore().insertBatch(Arrays.asList(e1, e2)); + + EpochManager epochManager = EpochManager.getInstance(); + epochManager.tryUpdateEpoch(EpochManager.GLOBAL, false); + epochManager.updateAllEpochs(); + Assertions.assertFalse(epochManager.getOwnedEpochs().isEmpty()); + } + } diff --git a/src/core-metrics/src/main/java/org/apache/kylin/common/metrics/MetricsInfluxdbReporter.java b/src/core-metrics/src/main/java/org/apache/kylin/common/metrics/MetricsInfluxdbReporter.java index e33e4aafee..087555f533 100644 --- a/src/core-metrics/src/main/java/org/apache/kylin/common/metrics/MetricsInfluxdbReporter.java +++ b/src/core-metrics/src/main/java/org/apache/kylin/common/metrics/MetricsInfluxdbReporter.java @@ -114,7 +114,7 @@ public class MetricsInfluxdbReporter implements MetricsReporter { dailyInstance.init(); Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(() -> { try { - logger.info("Start to aggregate daily metrics ..."); + logger.debug("Start to aggregate daily metrics ..."); long now = System.currentTimeMillis(); long todayStart = TimeUtil.getDayStart(now); @@ -141,7 +141,7 @@ public class MetricsInfluxdbReporter implements MetricsReporter { updateDailyMetrics(todayStart, config); retry.set(0); - logger.info("Aggregate daily metrics success ..."); + logger.debug("Aggregate daily metrics success ..."); } catch (Exception e) { retry.incrementAndGet(); logger.error("Failed to aggregate daily metrics, retry: {}", retry.get(), e); diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java index 294b6ddb75..6bcbe1cfbd 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.http.HttpStatus; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.exception.KylinRuntimeException; import org.apache.kylin.common.logging.SetLogCategory; @@ -73,7 +74,6 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class AutoRefreshSnapshotRunner implements Runnable { private static final String SNAPSHOT_VIEW_MAPPING_ERROR_MESSAGE = "Project[%s] Save View Mapping Failed"; - private static final String SCHEDULE_LOG_CATEGORY = "schedule"; private static final Map<String, AutoRefreshSnapshotRunner> INSTANCE_MAP = Maps.newConcurrentMap(); @Setter @@ -131,7 +131,7 @@ public class AutoRefreshSnapshotRunner implements Runnable { } public void doRun() { - try (SetLogCategory ignored = new SetLogCategory(SCHEDULE_LOG_CATEGORY)) { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { log.info("Project[{}] start check and refresh snapshot", project); if (log.isDebugEnabled()) { val poolExecutor = (ThreadPoolExecutor) jobPool; @@ -346,7 +346,7 @@ public class AutoRefreshSnapshotRunner implements Runnable { @Override public void run() { - try (SetLogCategory ignored = new SetLogCategory(SCHEDULE_LOG_CATEGORY)) { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { saveMarkFile(); doRun(); } catch (Exception e) { @@ -357,7 +357,7 @@ public class AutoRefreshSnapshotRunner implements Runnable { } public void runWhenSchedulerInit() { - try (SetLogCategory ignored = new SetLogCategory(SCHEDULE_LOG_CATEGORY)) { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { doRun(); } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java index c9305850a9..b3b902b4b6 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -54,6 +54,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.kylin.cluster.ClusterManagerFactory; import org.apache.kylin.cluster.IClusterManager; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.exception.ErrorCode; import org.apache.kylin.common.exception.ExceptionReason; import org.apache.kylin.common.exception.ExceptionResolve; @@ -61,6 +62,7 @@ import org.apache.kylin.common.exception.JobErrorCode; import org.apache.kylin.common.exception.JobExceptionReason; import org.apache.kylin.common.exception.JobExceptionResolve; import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.metrics.MetricsCategory; import org.apache.kylin.common.metrics.MetricsGroup; import org.apache.kylin.common.metrics.MetricsName; @@ -156,7 +158,7 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl @Autowired private ModelService modelService; - private static final Logger logger = LoggerFactory.getLogger("schedule"); + private static final Logger logger = LoggerFactory.getLogger(LogConstant.BUILD_CATEGORY); private static final Map<String, String> jobTypeMap = Maps.newHashMap(); private static final String LAST_MODIFIED = "last_modified"; @@ -615,7 +617,7 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl // waite time in output Map<String, String> waiteTimeMap; val output = executable.getOutput(); - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.BUILD_CATEGORY)) { waiteTimeMap = JsonUtil.readValueAsMap(output.getExtra().getOrDefault(NBatchConstants.P_WAITE_TIME, "{}")); } catch (IOException e) { logger.error(e.getMessage(), e); @@ -710,7 +712,7 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl } public void setExceptionResolveAndCodeAndReason(Output output, ExecutableStepResponse executableStepResponse) { - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.BUILD_CATEGORY)) { val exceptionCode = getExceptionCode(output); executableStepResponse.setFailedResolve(ExceptionResolve.getResolve(exceptionCode)); executableStepResponse.setFailedCode(ErrorCode.getLocalizedString(exceptionCode)); @@ -734,7 +736,7 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl } public String getExceptionCode(Output output) { - try { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.BUILD_CATEGORY)) { var exceptionOrExceptionMessage = output.getFailedReason(); if (StringUtils.isBlank(exceptionOrExceptionMessage)) { @@ -889,7 +891,9 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl result.setSequenceID(stageBase.getStepId()); if (stageOutput == null) { - logger.warn("Cannot found output for task: id={}", stageBase.getId()); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.BUILD_CATEGORY)) { + logger.warn("Cannot found output for task: id={}", stageBase.getId()); + } return result; } for (Map.Entry<String, String> entry : stageOutput.getExtra().entrySet()) { @@ -925,7 +929,9 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl result.setSequenceID(task.getStepId()); if (stepOutput == null) { - logger.warn("Cannot found output for task: id={}", task.getId()); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.BUILD_CATEGORY)) { + logger.warn("Cannot found output for task: id={}", task.getId()); + } return result; } @@ -1018,7 +1024,9 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl } public void batchUpdateGlobalJobStatus(List<String> jobIds, String action, List<String> filterStatuses) { - logger.info("Owned projects is {}", projectService.getOwnedProjects()); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.BUILD_CATEGORY)) { + logger.info("Owned projects is {}", projectService.getOwnedProjects()); + } for (String project : projectService.getOwnedProjects()) { aclEvaluate.checkProjectOperationPermission(project); batchUpdateJobStatus0(jobIds, project, action, filterStatuses); @@ -1252,7 +1260,9 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl project); FusionModel fusionModel = fusionModelManager.getFusionModel(modelId); if (!model.isFusionModel() || Objects.isNull(fusionModel)) { - logger.warn("model is not fusion model or fusion model is null, {}", modelId); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.BUILD_CATEGORY)) { + logger.warn("model is not fusion model or fusion model is null, {}", modelId); + } return; } @@ -1370,7 +1380,9 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof ContextClosedEvent) { - logger.info("Stop kyligence node, kill job on yarn for yarn cluster mode"); + try (SetLogCategory ignored = new SetLogCategory(LogConstant.BUILD_CATEGORY)) { + logger.info("Stop kyligence node, kill job on yarn for yarn cluster mode"); + } EpochManager epochManager = EpochManager.getInstance(); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); List<Epoch> ownedEpochs = epochManager.getOwnedEpochs(); diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java index 0de349d774..c51fc1c35e 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java @@ -21,6 +21,7 @@ package org.apache.kylin.rest.service; import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_ACTION_ILLEGAL; import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_NOT_EXIST; import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_STATUS_ILLEGAL; +import static org.apache.kylin.job.constant.JobStatusEnum.PENDING; import static org.apache.kylin.job.constant.JobStatusEnum.SKIP; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; @@ -917,7 +918,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { List<ExecutableStepResponse> stages2 = subStages.get(segmentId2).getStage(); assertEquals(1, stages2.size()); ExecutableStepResponse logicStepResponse2 = stages2.get(0); - checkResponse(logicStepResponse2, logicStep.getId(), JobStatusEnum.PENDING); + checkResponse(logicStepResponse2, logicStep.getId(), PENDING); assertEquals(0, logicStepResponse2.getExecStartTime()); assertTrue(logicStepResponse2.getExecStartTime() < System.currentTimeMillis()); @@ -2034,4 +2035,13 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { .get(0).getOutput().getStatus(), jobStatus); } } + + @Test + public void testParseToExecutableStepWithStepOutputNull() { + AbstractExecutable task = new FiveSecondSucceedTestExecutable(); + task.setProject("default"); + ExecutableState jobState = ExecutableState.RUNNING; + ExecutableStepResponse result = jobService.parseToExecutableStep(task, null, new HashMap<>(), jobState); + Assert.assertSame(PENDING, result.getStatus()); + } } diff --git a/src/kylin-it/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterTest.java b/src/kylin-it/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterTest.java index 1f267173ae..61fb0132b3 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/rest/broadcaster/BroadcasterTest.java @@ -18,10 +18,12 @@ package org.apache.kylin.rest.broadcaster; +import java.io.IOException; import java.util.Arrays; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.transaction.AddS3CredentialToSparkBroadcastEventNotifier; +import org.apache.kylin.common.persistence.transaction.AuditLogBroadcastEventNotifier; import org.apache.kylin.common.persistence.transaction.BroadcastEventReadyNotifier; import org.apache.kylin.junit.annotation.MetadataInfo; import org.apache.kylin.rest.cluster.ClusterManager; @@ -30,6 +32,7 @@ import org.apache.kylin.rest.config.initialize.BroadcastListener; import org.apache.kylin.rest.security.AclPermission; import org.apache.kylin.rest.security.AdminUserSyncEventNotifier; import org.apache.kylin.rest.security.UserAclManager; +import org.apache.kylin.rest.service.AuditLogService; import org.apache.kylin.rest.service.UserAclService; import org.apache.kylin.rest.service.UserService; import org.apache.kylin.rest.util.SpringContext; @@ -51,6 +54,8 @@ import org.apache.kylin.metadata.epoch.EpochManager; import lombok.val; import lombok.extern.slf4j.Slf4j; +import static org.apache.kylin.common.persistence.transaction.BroadcastEventReadyNotifier.BroadcastScopeEnum.WHOLE_NODES; + @Slf4j @MetadataInfo(onlyProps = true) class BroadcasterTest { @@ -77,6 +82,22 @@ class BroadcasterTest { } } + @Test + void testBroadcastWithAnnounceContains() { + try (ConfigurableApplicationContext context = this.application.run("--kylin.server.mode=all")) { + SpringContext springContext = context.getBean(SpringContext.class); + ReflectionTestUtils.setField(springContext, "applicationContext", context); + Broadcaster broadcaster = context.getBean(Broadcaster.class); + + BroadcastEventReadyNotifier eventReadyNotifier = new BroadcastEventReadyNotifier(); + broadcaster.announce(eventReadyNotifier); + // announce twice + broadcaster.announce(eventReadyNotifier); + + Assertions.assertSame(WHOLE_NODES, eventReadyNotifier.getBroadcastScope()); + } + } + @Test void testBroadcastSyncAdminUserAcl() throws Exception { EpochManager epochManager = EpochManager.getInstance(); @@ -97,6 +118,20 @@ class BroadcasterTest { assert SparderEnv.getSparkSession().conf().contains(String.format("fs.s3a.bucket.%s.assumed.role.arn", "aa")); } + @Test + void testBroadcastWithAuditLog() { + BroadcastListener broadcastListener = new BroadcastListener(); + val auditLogService = Mockito.spy(AuditLogService.class); + ReflectionTestUtils.setField(broadcastListener, "auditLogService", auditLogService); + String errorMsg = ""; + try { + broadcastListener.handle(new AuditLogBroadcastEventNotifier()); + } catch (IOException e) { + errorMsg = e.getMessage(); + } + Assertions.assertTrue(errorMsg.isEmpty()); + } + @Configuration static class Config { @Bean diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java index bdbce8d893..5f68795339 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java @@ -162,7 +162,7 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { public SQLResponse doSearchQuery(QueryCacheManager.Type type, SQLRequest sqlRequest) { Object response = kylinCache.get(type.rootCacheName, sqlRequest.getProject(), sqlRequest.getCacheKey()); - logger.info("[query cache log] The cache key is: {}", sqlRequest.getCacheKey()); + logger.debug("[query cache log] The cache key is: {}", sqlRequest.getCacheKey()); if (response == null) { return null; } @@ -178,7 +178,7 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { // check signature for success query resp in case the datasource is changed if (QueryCacheSignatureUtil.checkCacheExpired(cached, sqlRequest.getProject())) { - logger.info("[query cache log] cache has expired, cache key is {}", sqlRequest.getCacheKey()); + logger.debug("[query cache log] cache has expired, cache key is {}", sqlRequest.getCacheKey()); clearQueryCache(sqlRequest); return null; } @@ -315,10 +315,10 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { public void clearProjectCache(String project) { if (project == null) { - logger.debug("[query cache log] clear query cache for all projects."); + logger.info("[query cache log] clear query cache for all projects."); kylinCache.clearAll(); } else { - logger.debug("[query cache log] clear query cache for {}", project); + logger.info("[query cache log] clear query cache for {}", project); kylinCache.clearByType(Type.SUCCESS_QUERY_CACHE.rootCacheName, project); kylinCache.clearByType(Type.EXCEPTION_QUERY_CACHE.rootCacheName, project); kylinCache.clearByType(Type.SCHEMA_CACHE.rootCacheName, project); diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index dcc26d6a51..04dad61a83 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -62,6 +62,7 @@ import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.QueryTrace; +import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.exception.KylinTimeoutException; @@ -183,7 +184,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup public static final String QUERY_STORE_PATH_PREFIX = "/query/"; private static final String JDBC_METADATA_SCHEMA = "metadata"; - private static final Logger logger = LoggerFactory.getLogger("query"); + private static final Logger logger = LoggerFactory.getLogger(LogConstant.QUERY_CATEGORY); final SlowQueryDetector slowQueryDetector = new SlowQueryDetector(); @Autowired @@ -298,6 +299,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup if (QueryContext.current().getQueryTagInfo().isAsyncQuery() && NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue()) { + logger.info("This query is an async query in project: {}", sqlRequest.getProject()); if (StringUtils.isNotEmpty(sqlRequest.getSparkQueue())) { queryParams.setSparkQueue(sqlRequest.getSparkQueue()); } @@ -479,7 +481,8 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup queryContext.setQueryId(UUID.fromString(sqlRequest.getQueryId()).toString()); } try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId()); - SetLogCategory ignored2 = new SetLogCategory("query")) { + SetLogCategory ignored2 = new SetLogCategory(LogConstant.QUERY_CATEGORY)) { + logger.info("Start query in project: {}", sqlRequest.getProject()); if (sqlRequest.getExecuteAs() != null) sqlRequest.setUsername(sqlRequest.getExecuteAs()); else @@ -606,6 +609,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup QueryUtils.updateQueryContextSQLMetrics(rawSql.getStatementString()); QueryContext.currentTrace().amendLast(QueryTrace.PREPARE_AND_SUBMIT_JOB, System.currentTimeMillis()); QueryContext.currentTrace().endLastSpan(); + QueryContext.current().record("update_metrics_time"); QueryContext.currentMetrics().setQueryEndTime(System.currentTimeMillis()); sqlResponse.setServer(clusterManager.getLocalServer()); diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java index 91cc8670bf..c9543e11e0 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java @@ -321,7 +321,7 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD Database db = SparderEnv.getSparkSession().catalog().getDatabase(database); } catch (AnalysisException e) { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - logger.info("The current user: {} does not have permission to access database {}", ugi.getUserName(), + logger.error("The current user: {} does not have permission to access database {}", ugi.getUserName(), database); return false; } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala index 29d38f641b..931570c6c9 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala @@ -18,12 +18,10 @@ package org.apache.kylin.engine.spark.builder -import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.{Locale, Objects, Timer, TimerTask} - +import com.google.common.collect.Sets import org.apache.commons.lang3.StringUtils import org.apache.kylin.common.util.HadoopUtil -import org.apache.kylin.common.{KapConfig, KylinConfig} +import org.apache.kylin.common.{CustomUtils, KapConfig, KylinConfig} import org.apache.kylin.engine.spark.builder.DFBuilderHelper._ import org.apache.kylin.engine.spark.job.NSparkCubingUtil._ import org.apache.kylin.engine.spark.job.{FiltersUtil, TableMetaManager} @@ -38,6 +36,12 @@ import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.types.StructField import org.apache.spark.sql.util.SparderTypeUtil import org.apache.spark.utils.ProxyThreadUtils +import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.{Locale, Objects, Timer, TimerTask} + +import org.apache.kylin.common.constant.LogConstant +import org.apache.kylin.common.logging.SetLogCategory +import org.apache.spark.util.Utils import scala.collection.JavaConverters._ import scala.collection.mutable @@ -46,8 +50,6 @@ import scala.concurrent.duration.{Duration, MILLISECONDS} import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{Failure, Success, Try} -import com.google.common.collect.Sets - class SegmentFlatTable(private val sparkSession: SparkSession, // private val tableDesc: SegmentFlatTableDesc) extends LogEx { @@ -524,7 +526,9 @@ object SegmentFlatTable extends LogEx { val newFields = originDS.schema.fields.map(f => convertFromDot("`" + alias + "`" + "." + "`" + f.name + "`")).toSeq val newDS = originDS.toDF(newFields: _*) - logInfo(s"Wrap ALIAS ${originDS.schema.treeString} TO ${newDS.schema.treeString}") + CustomUtils.tryWithResourceIgnore(new SetLogCategory(LogConstant.BUILD_CATEGORY)) { + _ => logInfo(s"Wrap ALIAS ${originDS.schema.treeString} TO ${newDS.schema.treeString}") + } newDS } @@ -557,7 +561,9 @@ object SegmentFlatTable extends LogEx { val equiConditionColPairs = fk.zip(pk).map(joinKey => col(convertFromDot(joinKey._1.getBackTickIdentity)) .equalTo(col(convertFromDot(joinKey._2.getBackTickIdentity)))) - logInfo(s"Lookup table schema ${lookupDataset.schema.treeString}") + CustomUtils.tryWithResourceIgnore(new SetLogCategory(LogConstant.BUILD_CATEGORY)) { + _ => logInfo(s"Lookup table schema ${lookupDataset.schema.treeString}") + } if (join.getNonEquiJoinCondition != null) { var condition = NonEquiJoinConditionBuilder.convert(join.getNonEquiJoinCondition) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala index 269065599b..a2c272dec6 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala @@ -80,11 +80,10 @@ class RDSegmentBuildExec(private val jobContext: RDSegmentBuildJob, // val paths = ResourceDetectUtils.getPaths(execution.sparkPlan).map(_.toString).asJava logInfo(s"Detected source: $sourceName $leaves ${paths.asScala.mkString(",")}") val startTime = System.currentTimeMillis() - logInfo(s"Detect source size start time is $startTime") - val resourceSize = ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(),config.isConcurrencyFetchDataSourceSize, + val resourceSize = ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), config.isConcurrencyFetchDataSourceSize, paths.asScala.map(path => new Path(path)): _*) val endTime = System.currentTimeMillis() - logInfo(s"Detect source size end time is $endTime") + logInfo(s"Detect source size cost time is ${endTime - startTime} ms.") logInfo(s"Detect source size $resourceSize") sourceSize.put(sourceName, resourceSize) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java index b9663c8d26..6bbf5217c7 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java @@ -95,7 +95,9 @@ public class SegmentBuildJob extends SegmentJob { @Override protected final void doExecute() throws Exception { + log.info("Start sub stage {}" + REFRESH_SNAPSHOTS.name()); REFRESH_SNAPSHOTS.create(this, null, null).toWork(); + log.info("End sub stage {}" + REFRESH_SNAPSHOTS.name()); buildContext = new BuildContext(getSparkSession().sparkContext(), config); buildContext.appStatusTracker().startMonitorBuildResourceState(); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/BuildExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/BuildExec.scala index 70a01b35e4..91b2850c83 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/BuildExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/BuildExec.scala @@ -19,13 +19,15 @@ package org.apache.kylin.engine.spark.job.exec import org.apache.kylin.engine.spark.job.stage.StageExec - import java.io.IOException import java.util import java.util.Locale + +import org.apache.spark.internal.Logging + import scala.collection.JavaConverters._ -class BuildExec(var id: String) { +class BuildExec(var id: String) extends Logging{ protected var subStages = new util.ArrayList[StageExec] def getId: String = { @@ -35,7 +37,9 @@ class BuildExec(var id: String) { @throws(classOf[IOException]) def buildSegment(): Unit = { for (stage <- subStages.asScala) { + logInfo(s"Start sub stage ${stage.getStageName}") stage.toWork() + logInfo(s"End sub stage ${stage.getStageName}") } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/MergeExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/MergeExec.scala index 498ef117d4..2b4bd57c94 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/MergeExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/MergeExec.scala @@ -29,7 +29,9 @@ class MergeExec(id: String) extends BuildExec(id) { @throws(classOf[IOException]) def mergeSegment(): Unit = { for (stage <- subStages.asScala) { + logInfo(s"Start sub stage ${stage.getStageName}") stage.toWork() + logInfo(s"End sub stage ${stage.getStageName}") } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/SnapshotExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/SnapshotExec.scala index 4394a8615d..86dd500178 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/SnapshotExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/SnapshotExec.scala @@ -29,7 +29,9 @@ class SnapshotExec(id: String) extends BuildExec(id) { @throws(classOf[IOException]) def buildSnapshot(): Unit = { for (stage <- subStages.asScala) { + logInfo(s"Start sub stage ${stage.getStageName}") stage.toWorkWithoutFinally() + logInfo(s"End sub stage ${stage.getStageName}") } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/TableAnalyzerExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/TableAnalyzerExec.scala index d95607398d..aa0bb0d780 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/TableAnalyzerExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/exec/TableAnalyzerExec.scala @@ -27,7 +27,9 @@ class TableAnalyzerExec(id: String) extends BuildExec(id) { def analyzerTable(): Unit = { for (stage <- subStages.asScala) { + logInfo(s"Start sub stage ${stage.getStageName}") stage.toWorkWithoutFinally() + logInfo(s"End sub stage ${stage.getStageName}") } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala index 50de84001b..7f99feff77 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/StageExec.scala @@ -32,6 +32,8 @@ import java.util trait StageExec extends Logging { protected var id: String = _ + def getStageName: String + def getJobContext: SparkApplication def getDataSegment: NDataSegment diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResource.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResource.scala index c7111bd852..fd05f59b8a 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResource.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResource.scala @@ -80,4 +80,6 @@ class WaiteForResource(jobContext: SparkApplication) extends StageExec { KylinBuildEnv.get().buildJobInfos.endWait() } } + + override def getStageName: String = "WaiteForResource" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala index 84f8454bae..ca4aaa2998 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala @@ -31,4 +31,6 @@ class BuildDict(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: B val dict: Dataset[Row] = buildDictIfNeed() buildParam.setDict(dict) } + + override def getStageName: String = "BuildDict" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala index 734a4361e2..a8cf9dc237 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala @@ -31,4 +31,6 @@ class BuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: // Drain results immediately after building. drain() } + + override def getStageName: String = "BuildLayer" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/CostBasedPlanner.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/CostBasedPlanner.scala index 08ba41e46d..cef881284c 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/CostBasedPlanner.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/CostBasedPlanner.scala @@ -50,4 +50,6 @@ class CostBasedPlanner(jobContext: SegmentJob, dataSegment: NDataSegment, buildP buildParam.setFlatTableDesc(flatTableDesc) } } + + override def getStageName: String = "CostBasedPlanner" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala index 38a191d461..24391d28a7 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala @@ -48,6 +48,10 @@ import org.apache.spark.utils.ProxyThreadUtils import java.math.BigInteger import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.{Locale, Objects, Timer, TimerTask} + +import org.apache.kylin.common.constant.LogConstant +import org.apache.kylin.common.logging.SetLogCategory + import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.parallel.ForkJoinTaskSupport @@ -607,6 +611,7 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, private def buildDict(ds: Dataset[Row], dictCols: Set[TblColRef]): Unit = { if (config.isV2DictEnable) { + logInfo("Build v2 dict default.") var matchedCols = selectColumnsInTable(ds, dictCols) if (dataSegment.getIndexPlan.isSkipEncodeIntegerFamilyEnabled) { matchedCols = matchedCols.filterNot(_.getType.isIntegerFamily) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala index 920e1b7bf6..6668593a42 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala @@ -48,4 +48,6 @@ class GatherFlatTableStats(jobContext: SegmentJob, dataSegment: NDataSegment, bu // Cleanup previous potentially left temp layout data. cleanupLayoutTempData(dataSegment, readOnlyLayouts.asScala.toSeq) } + + override def getStageName: String = "GatherFlatTableStats" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala index f3b304a57d..ba62f0bfd9 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala @@ -38,4 +38,6 @@ class GenerateFlatTable(jobContext: SegmentJob, dataSegment: NDataSegment, build onStageSkipped() } } + + override def getStageName: String = "GenerateFlatTable" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala index f467058559..632ae0cd1c 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala @@ -54,4 +54,6 @@ class MaterializedFactTableView(jobContext: SegmentJob, dataSegment: NDataSegmen onStageSkipped() } } + + override def getStageName: String = "MaterializedFactTableView" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytes.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytes.scala index 328097df3e..48b63f122b 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytes.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytes.scala @@ -35,4 +35,6 @@ class RefreshColumnBytes(jobContext: SegmentJob, dataSegment: NDataSegment, buil cleanup() logInfo(s"Finished SEGMENT $segmentId") } + + override def getStageName: String = "RefreshColumnBytes" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala index 4cfc8711b2..ef6518546b 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshots.scala @@ -44,4 +44,6 @@ class RefreshSnapshots(jobContext: SegmentJob) extends StageExec { case _ => } } + + override def getStageName: String = "RefreshSnapshots" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala index ab0d070b52..23e834f085 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala @@ -30,4 +30,6 @@ class PartitionBuildDict(jobContext: SegmentJob, dataSegment: NDataSegment, buil val dict: Dataset[Row] = buildDictIfNeed() buildParam.setDict(dict) } + + override def getStageName: String = "PartitionBuildDict" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala index 7a43766be0..995bb588cc 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildLayer.scala @@ -30,4 +30,6 @@ class PartitionBuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, bui // Drain results immediately after building. drain() } + + override def getStageName: String = "PartitionBuildLayer" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionCostBasedPlanner.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionCostBasedPlanner.scala index 3aac410594..7b3c1bf586 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionCostBasedPlanner.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionCostBasedPlanner.scala @@ -55,4 +55,6 @@ class PartitionCostBasedPlanner(jobContext: SegmentJob, dataSegment: NDataSegmen buildParam.setTableDesc(tableDesc) } } + + override def getStageName: String = "PartitionCostBasedPlanner" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGatherFlatTableStats.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGatherFlatTableStats.scala index 43afbb100e..103c02c32b 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGatherFlatTableStats.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGatherFlatTableStats.scala @@ -75,4 +75,5 @@ class PartitionGatherFlatTableStats(jobContext: SegmentJob, dataSegment: NDataSe } } + override def getStageName: String = "PartitionGatherFlatTableStats" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala index b42b2a3956..bf406ac72e 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala @@ -37,4 +37,6 @@ class PartitionGenerateFlatTable(jobContext: SegmentJob, dataSegment: NDataSegme onStageSkipped() } } + + override def getStageName: String = "PartitionGenerateFlatTable" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala index e8f2fca99f..deb1c604bb 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala @@ -61,4 +61,6 @@ class PartitionMaterializedFactTableView(jobContext: SegmentJob, dataSegment: ND onStageSkipped() } } + + override def getStageName: String = "PartitionMaterializedFactTableView" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytes.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytes.scala index c8e26660e2..86ef005167 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytes.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytes.scala @@ -34,4 +34,6 @@ class PartitionRefreshColumnBytes(jobContext: SegmentJob, dataSegment: NDataSegm cleanup() logInfo(s"Finished SEGMENT $segmentId") } + + override def getStageName: String = "PartitionRefreshColumnBytes" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala index 3091e92e0a..1aa160e8f7 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala @@ -29,4 +29,6 @@ class MergeColumnBytes(jobContext: SegmentJob, dataSegment: NDataSegment) cleanup() } + + override def getStageName: String = "MergeColumnBytes" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeFlatTable.scala index 58c5edbcbf..f525cfec82 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeFlatTable.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeFlatTable.scala @@ -29,4 +29,6 @@ class MergeFlatTable(jobContext: SegmentJob, dataSegment: NDataSegment) mergeFlatTable() } + + override def getStageName: String = "MergeFlatTable" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeIndices.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeIndices.scala index 66956fafe6..9812131e51 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeIndices.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeIndices.scala @@ -29,4 +29,6 @@ class MergeIndices(jobContext: SegmentJob, dataSegment: NDataSegment) // Drain results immediately after merging. drain() } + + override def getStageName: String = "MergeIndices" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala index 4f52f70781..2de190be1d 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala @@ -29,4 +29,6 @@ class PartitionMergeColumnBytes(jobContext: SegmentJob, dataSegment: NDataSegmen cleanup() } + + override def getStageName: String = "PartitionMergeColumnBytes" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeFlatTable.scala index 8fb98af061..5ca128dceb 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeFlatTable.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeFlatTable.scala @@ -29,4 +29,6 @@ class PartitionMergeFlatTable(jobContext: SegmentJob, dataSegment: NDataSegment) mergeFlatTable() } + + override def getStageName: String = "PartitionMergeFlatTable" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeIndices.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeIndices.scala index f18f264aed..d51ff5c7ac 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeIndices.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeIndices.scala @@ -29,4 +29,6 @@ class PartitionMergeIndices(jobContext: SegmentJob, dataSegment: NDataSegment) // Drain results immediately after merging. drain() } + + override def getStageName: String = "PartitionMergeIndices" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala index 77f8bb5bb9..de0d8dd1cc 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala @@ -33,4 +33,6 @@ class SnapshotsBuild(jobContext: SnapshotBuildJob) extends StageExec { override def execute(): Unit = { jobContext.buildSnapshot() } + + override def getStageName: String = "SnapshotsBuild" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/tablesampling/AnalyzerTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/tablesampling/AnalyzerTable.scala index e12e44344a..f215383531 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/tablesampling/AnalyzerTable.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/tablesampling/AnalyzerTable.scala @@ -33,4 +33,6 @@ class AnalyzerTable(jobContext: TableAnalyzerJob) extends StageExec { override def execute(): Unit = { jobContext.analyzerTable() } + + override def getStageName: String = "AnalyzerTable" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResourceTest.scala similarity index 62% copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala copy to src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResourceTest.scala index 77f8bb5bb9..7d06e5dfc0 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/snapshots/SnapshotsBuild.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/WaiteForResourceTest.scala @@ -16,21 +16,19 @@ * limitations under the License. */ -package org.apache.kylin.engine.spark.job.stage.snapshots +package org.apache.kylin.engine.spark.job.stage import org.apache.kylin.engine.spark.application.SparkApplication -import org.apache.kylin.engine.spark.job.SnapshotBuildJob -import org.apache.kylin.engine.spark.job.stage.StageExec -import org.apache.kylin.metadata.cube.model.NDataSegment +import org.junit.Assert +import org.mockito.Mockito +import org.scalatest.funsuite.AnyFunSuite -class SnapshotsBuild(jobContext: SnapshotBuildJob) extends StageExec { - override def getJobContext: SparkApplication = jobContext +class WaiteForResourceTest extends AnyFunSuite{ - override def getDataSegment: NDataSegment = null + test("test WaiteForResource getStageName") { + val sparkApplication = Mockito.mock(classOf[SparkApplication]) - override def getSegmentId: String = jobContext.getJobId - - override def execute(): Unit = { - jobContext.buildSnapshot() + val waiteForResource = new WaiteForResource(sparkApplication) + Assert.assertEquals("WaiteForResource", waiteForResource.getStageName) } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytesTest.scala similarity index 52% copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala copy to src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytesTest.scala index 920e1b7bf6..d6f0b87c1a 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GatherFlatTableStats.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshColumnBytesTest.scala @@ -21,31 +21,26 @@ package org.apache.kylin.engine.spark.job.stage.build import org.apache.kylin.engine.spark.job.SegmentJob import org.apache.kylin.engine.spark.job.stage.BuildParam import org.apache.kylin.metadata.cube.model.NDataSegment +import org.apache.kylin.metadata.model.NDataModel +import org.junit.Assert +import org.mockito.Mockito +import org.scalatest.funsuite.AnyFunSuite -import scala.collection.JavaConverters._ +import com.google.common.collect.ImmutableBiMap -class GatherFlatTableStats(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam) - extends BuildStage(jobContext, dataSegment, buildParam) { +class RefreshColumnBytesTest extends AnyFunSuite { - override def execute(): Unit = { - scheduleCheckpoint() + test("test RefreshColumnBytes getStageName") { + val segmentJob = Mockito.mock(classOf[SegmentJob]) + val dataSegment = Mockito.mock(classOf[NDataSegment]) + val buildParam = Mockito.mock(classOf[BuildParam]) - // Build flat table? - if (buildParam.getSpanningTree.fromFlatTable()) { - // Collect statistics for flat table. - val statistics = buildStatistics() - buildParam.setFlatTableStatistics(statistics) + val dataModel = Mockito.mock(classOf[NDataModel]) + Mockito.when(dataSegment.getModel).thenReturn(dataModel) + val builder: ImmutableBiMap.Builder[Integer, NDataModel.Measure] = ImmutableBiMap.builder(); + Mockito.when(dataSegment.getModel.getEffectiveMeasures).thenReturn(builder.build()) - // Build inferior flat table. - if (config.isInferiorFlatTableEnabled) { - buildInferior() - } - } - - // Build root node's layout sanity cache. - buildSanityCache() - - // Cleanup previous potentially left temp layout data. - cleanupLayoutTempData(dataSegment, readOnlyLayouts.asScala.toSeq) + val mergeColumnBytes = new RefreshColumnBytes(segmentJob, dataSegment, buildParam) + Assert.assertEquals("RefreshColumnBytes", mergeColumnBytes.getStageName) } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala similarity index 69% copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala copy to src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala index 734a4361e2..82931d40ca 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildLayer.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/RefreshSnapshotsTest.scala @@ -19,16 +19,16 @@ package org.apache.kylin.engine.spark.job.stage.build import org.apache.kylin.engine.spark.job.SegmentJob -import org.apache.kylin.engine.spark.job.stage.BuildParam -import org.apache.kylin.metadata.cube.model.NDataSegment +import org.junit.Assert +import org.mockito.Mockito +import org.scalatest.funsuite.AnyFunSuite -class BuildLayer(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam) - extends BuildStage(jobContext, dataSegment, buildParam) { +class RefreshSnapshotsTest extends AnyFunSuite { - override def execute(): Unit = { - // Build layers. - buildLayouts() - // Drain results immediately after building. - drain() + test("test RefreshSnapshots getStageName") { + val segmentJob = Mockito.mock(classOf[SegmentJob]) + + val refreshSnapshots = new RefreshSnapshots(segmentJob) + Assert.assertEquals("RefreshSnapshots", refreshSnapshots.getStageName) } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytesTest.scala similarity index 51% copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala copy to src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytesTest.scala index b42b2a3956..ee95311a9d 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionGenerateFlatTable.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionRefreshColumnBytesTest.scala @@ -21,20 +21,26 @@ package org.apache.kylin.engine.spark.job.stage.build.partition import org.apache.kylin.engine.spark.job.SegmentJob import org.apache.kylin.engine.spark.job.stage.BuildParam import org.apache.kylin.metadata.cube.model.NDataSegment -import org.apache.spark.sql.{Dataset, Row} +import org.apache.kylin.metadata.model.NDataModel +import org.junit.Assert +import org.mockito.Mockito +import org.scalatest.funsuite.AnyFunSuite -class PartitionGenerateFlatTable(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam) - extends PartitionFlatTableAndDictBase(jobContext, dataSegment, buildParam) { - override def execute(): Unit = { - val flatTable: Dataset[Row] = generateFlatTable() - buildParam.setFlatTable(flatTable) - val flatTablePart: Dataset[Row] = generateFlatTablePart() - buildParam.setFlatTablePart(flatTablePart) +import com.google.common.collect.ImmutableBiMap - buildParam.setPartitionFlatTable(this) +class PartitionRefreshColumnBytesTest extends AnyFunSuite { - if (buildParam.isSkipGenerateFlatTable) { - onStageSkipped() - } + test("test PartitionRefreshColumnBytes getStageName") { + val segmentJob = Mockito.mock(classOf[SegmentJob]) + val dataSegment = Mockito.mock(classOf[NDataSegment]) + val buildParam = Mockito.mock(classOf[BuildParam]) + + val dataModel = Mockito.mock(classOf[NDataModel]) + Mockito.when(dataSegment.getModel).thenReturn(dataModel) + val builder: ImmutableBiMap.Builder[Integer, NDataModel.Measure] = ImmutableBiMap.builder(); + Mockito.when(dataSegment.getModel.getEffectiveMeasures).thenReturn(builder.build()) + + val partitionRefreshColumnBytes = new PartitionRefreshColumnBytes(segmentJob, dataSegment, buildParam) + Assert.assertEquals("PartitionRefreshColumnBytes", partitionRefreshColumnBytes.getStageName) } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytesTest.scala similarity index 61% copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala copy to src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytesTest.scala index 3091e92e0a..28c0b0ccba 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytesTest.scala @@ -20,13 +20,21 @@ package org.apache.kylin.engine.spark.job.stage.merge import org.apache.kylin.engine.spark.job.SegmentJob import org.apache.kylin.metadata.cube.model.NDataSegment +import org.apache.kylin.metadata.model.NDataModel +import org.junit.Assert +import org.mockito.Mockito +import org.scalatest.funsuite.AnyFunSuite -class MergeColumnBytes(jobContext: SegmentJob, dataSegment: NDataSegment) - extends MergeStage(jobContext, dataSegment) { +class MergeColumnBytesTest extends AnyFunSuite { - override def execute(): Unit = { - mergeColumnBytes() + test("test MergeColumnBytes getStageName") { + val segmentJob = Mockito.mock(classOf[SegmentJob]) + val dataSegment = Mockito.mock(classOf[NDataSegment]) + val dataModel = Mockito.mock(classOf[NDataModel]) - cleanup() + Mockito.when(dataSegment.getModel).thenReturn(dataModel) + + val mergeColumnBytes = new MergeColumnBytes(segmentJob, dataSegment) + Assert.assertEquals("MergeColumnBytes", mergeColumnBytes.getStageName) } } diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStageTest.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStageTest.scala index 2e23a6f8c7..88b15ec3ee 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStageTest.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStageTest.scala @@ -51,6 +51,8 @@ class MergeStageTest extends AnyFunSuite with LocalMetadata { override def execute(): Unit = {} override def getUnmergedFTPaths: Seq[Path] = super.getUnmergedFTPaths + + override def getStageName: String = "MergeStageMock" } def testGetUnmergedFTPaths(config: KylinConfig): Unit = { diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytesTest.scala similarity index 59% copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala copy to src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytesTest.scala index 4f52f70781..cfaf90a59e 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytes.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeColumnBytesTest.scala @@ -20,13 +20,21 @@ package org.apache.kylin.engine.spark.job.stage.merge.partition import org.apache.kylin.engine.spark.job.SegmentJob import org.apache.kylin.metadata.cube.model.NDataSegment +import org.apache.kylin.metadata.model.NDataModel +import org.junit.Assert +import org.mockito.Mockito +import org.scalatest.funsuite.AnyFunSuite -class PartitionMergeColumnBytes(jobContext: SegmentJob, dataSegment: NDataSegment) - extends PartitionMergeStage(jobContext, dataSegment) { +class PartitionMergeColumnBytesTest extends AnyFunSuite { - override def execute(): Unit = { - mergeColumnBytes() + test("test PartitionMergeColumnBytes getStageName") { + val segmentJob = Mockito.mock(classOf[SegmentJob]) + val dataSegment = Mockito.mock(classOf[NDataSegment]) + val dataModel = Mockito.mock(classOf[NDataModel]) - cleanup() + Mockito.when(dataSegment.getModel).thenReturn(dataModel) + + val partitionMergeColumnBytes = new PartitionMergeColumnBytes(segmentJob, dataSegment) + Assert.assertEquals("PartitionMergeColumnBytes", partitionMergeColumnBytes.getStageName) } } diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/utils/TestResourceUtils.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/utils/TestResourceUtils.scala index 59bf9108b2..641f9ca70c 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/utils/TestResourceUtils.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/utils/TestResourceUtils.scala @@ -56,6 +56,12 @@ class TestResourceUtils extends SparderBaseFunSuite with BeforeAndAfterEach { // test case: available(10, 10) executor(20, 10) driver(1, 1) test("checkResource return false when available memory does not meet acquirement") { + // Without this may cause NPE + KylinBuildEnv.clean() + val config: KylinConfig = Mockito.mock(classOf[KylinConfig]) + Mockito.when(config.getMaxAllocationResourceProportion).thenReturn(0.9) + KylinBuildEnv.getOrCreate(config) + Mockito.when(fetcher.fetchMaximumResourceAllocation).thenReturn(ResourceInfo(Integer.MAX_VALUE, Integer.MAX_VALUE)) val conf = new SparkConf() conf.set(EXECUTOR_INSTANCES, "5") conf.set(EXECUTOR_MEMORY, "2MB") diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java index cd74a7cd33..490000d745 100644 --- a/src/spark-project/spark-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java @@ -66,7 +66,7 @@ public class AsyncProfilerUtils { if (!cachedResult.await(resultCollectionTimeout, TimeUnit.MILLISECONDS)) { logger.warn("timeout while waiting for profile result"); } - logger.debug("profiler stopped and result dumped to $localCacheDir"); + logger.debug("profiler stopped and result dumped to {}", localCacheDir); ZipFileUtils.compressZipFile(localCacheDir.getAbsolutePath(), outStream); } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/common/CustomUtils.scala similarity index 69% copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala copy to src/spark-project/spark-common/src/main/scala/org/apache/kylin/common/CustomUtils.scala index 3091e92e0a..b60fca9912 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeColumnBytes.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/common/CustomUtils.scala @@ -16,17 +16,16 @@ * limitations under the License. */ -package org.apache.kylin.engine.spark.job.stage.merge +package org.apache.kylin.common -import org.apache.kylin.engine.spark.job.SegmentJob -import org.apache.kylin.metadata.cube.model.NDataSegment +object CustomUtils { -class MergeColumnBytes(jobContext: SegmentJob, dataSegment: NDataSegment) - extends MergeStage(jobContext, dataSegment) { - - override def execute(): Unit = { - mergeColumnBytes() - - cleanup() + // Somehow equivalent to Java's try with resource, handle cannot be null + def tryWithResourceIgnore[T <: AutoCloseable](handle: T)(func: T => Any): Unit = { + try { + func(handle) + } finally { + handle.close() + } } } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/KylinLogTool.java b/src/tool/src/main/java/org/apache/kylin/tool/KylinLogTool.java index 54736982ed..612e6dc7ce 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/KylinLogTool.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/KylinLogTool.java @@ -93,7 +93,7 @@ public class KylinLogTool { private static final String SYSTEM_PROPERTIES = "System Properties"; private static final Set<String> kylinLogPrefix = Sets.newHashSet("kylin.log", "kylin.schedule.log", - "kylin.query.log", "kylin.smart.log", "kylin.build.log", "kylin.security.log"); + "kylin.query.log", "kylin.smart.log", "kylin.build.log", "kylin.security.log", "kylin.metadata.log"); private static final Set<String> queryDiagExcludedLogs = Sets.newHashSet("kylin.log", "kylin.schedule.log", "kylin.smart.log", "kylin.build.log", "kylin.security.log");