This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit db02b7f93b74b52662466983660bfda59d0a5dd9 Author: luoping.zhang <luoping.zh...@kyligence.io> AuthorDate: Fri May 12 17:23:18 2023 +0800 KYLIN-5690 Optimize the Kylin read metadata for maxId --- .../apache/kylin/rest/config/AppInitializer.java | 18 +++++++- .../service/task/QueryHistoryTaskScheduler.java | 52 ++++++++-------------- .../task/QueryHistoryTaskSchedulerTest.java | 9 +--- .../kylin/metadata/epoch/EpochOrchestrator.java | 6 ++- 4 files changed, 42 insertions(+), 43 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java index f36aa23f26..9f83f39451 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java @@ -38,6 +38,7 @@ import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.common.util.HostInfoFetcher; import org.apache.kylin.engine.spark.filter.QueryFiltersCollector; import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; +import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.epoch.EpochOrchestrator; import org.apache.kylin.metadata.project.NProjectLoader; import org.apache.kylin.metadata.project.NProjectManager; @@ -55,6 +56,7 @@ import org.apache.kylin.rest.config.initialize.SparderStartEvent; import org.apache.kylin.rest.config.initialize.TableSchemaChangeListener; import org.apache.kylin.rest.config.initialize.UserAclListener; import org.apache.kylin.rest.service.CommonQueryCacheSupporter; +import org.apache.kylin.rest.service.task.QueryHistoryTaskScheduler; import org.apache.kylin.rest.util.JStackDumpTask; import org.apache.kylin.streaming.jobs.StreamingJobListener; import org.apache.kylin.tool.daemon.KapGuardianHATask; @@ -175,12 +177,24 @@ public class AppInitializer { log.info("The system cache is warmed up."); } + private void resetProjectOffsetId() { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + List<ProjectInstance> prjInstances = NProjectManager.getInstance(kylinConfig).listAllProjects(); + EpochManager epochManager = EpochManager.getInstance(); + prjInstances.forEach(project -> { + if (epochManager.checkEpochOwner(project.getName())) { + QueryHistoryTaskScheduler.getInstance(project.getName()).resetOffsetId(); + } + }); + } + @EventListener(ApplicationReadyEvent.class) public void afterReady(ApplicationReadyEvent ignoredEvent) { val kylinConfig = KylinConfig.getInstanceFromEnv(); setFsUrlStreamHandlerFactory(); if (kylinConfig.isJobNode()) { new EpochOrchestrator(kylinConfig); + resetProjectOffsetId(); } if (kylinConfig.getJStackDumpTaskEnabled()) { taskScheduler.scheduleAtFixedRate(new JStackDumpTask(), @@ -193,7 +207,9 @@ public class AppInitializer { kylinConfig.getGuardianHACheckInterval() * Constant.SECOND); } - taskScheduler.scheduleAtFixedRate(new ProjectSerialEventBus.TimingDispatcher(), ProjectSerialEventBus.TimingDispatcher.INTERVAL); + taskScheduler.scheduleAtFixedRate(new ProjectSerialEventBus.TimingDispatcher(), + ProjectSerialEventBus.TimingDispatcher.INTERVAL); + } private void postInit() { diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java index 759b2db724..86e5d286ba 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java @@ -130,6 +130,21 @@ public class QueryHistoryTaskScheduler { log.info("Query history task scheduler is started for [{}] ", project); } + public void resetOffsetId() { + EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + long maxId = queryHistoryDAO.getQueryHistoryMaxId(project); + KylinConfig config = KylinConfig.getInstanceFromEnv(); + QueryHistoryIdOffsetManager manager = QueryHistoryIdOffsetManager.getInstance(config, project); + QueryHistoryIdOffset queryHistoryIdOffset = manager.get(); + if (queryHistoryIdOffset.getOffset() > maxId || queryHistoryIdOffset.getStatMetaUpdateOffset() > maxId) { + queryHistoryIdOffset.setOffset(maxId); + queryHistoryIdOffset.setStatMetaUpdateOffset(maxId); + manager.save(queryHistoryIdOffset); + } + return 0; + }, project); + } + public Future scheduleImmediately(QueryHistoryTask runner) { return taskScheduler.schedule(runner, 10L, TimeUnit.SECONDS); } @@ -174,10 +189,8 @@ public class QueryHistoryTaskScheduler { protected List<QueryHistory> getQueryHistories(int batchSize) { QueryHistoryIdOffsetManager qhIdOffsetManager = QueryHistoryIdOffsetManager .getInstance(KylinConfig.getInstanceFromEnv(), project); - List<QueryHistory> queryHistoryList = queryHistoryDAO.queryQueryHistoriesByIdOffset( - qhIdOffsetManager.get().getStatMetaUpdateOffset(), batchSize, project); - resetIdOffset(queryHistoryList); - return queryHistoryList; + return queryHistoryDAO.queryQueryHistoriesByIdOffset(qhIdOffsetManager.get().getStatMetaUpdateOffset(), + batchSize, project); } @Override @@ -350,10 +363,8 @@ public class QueryHistoryTaskScheduler { protected List<QueryHistory> getQueryHistories(int batchSize) { QueryHistoryIdOffsetManager qhIdOffsetManager = QueryHistoryIdOffsetManager .getInstance(KylinConfig.getInstanceFromEnv(), project); - List<QueryHistory> queryHistoryList = queryHistoryDAO - .queryQueryHistoriesByIdOffset(qhIdOffsetManager.get().getOffset(), batchSize, project); - resetIdOffset(queryHistoryList); - return queryHistoryList; + return queryHistoryDAO.queryQueryHistoriesByIdOffset(qhIdOffsetManager.get().getOffset(), batchSize, + project); } @Override @@ -428,31 +439,6 @@ public class QueryHistoryTaskScheduler { protected abstract String name(); - private volatile boolean needResetOffset = true; - - protected void resetIdOffset(List<QueryHistory> queryHistories) { - if (needResetOffset && CollectionUtils.isEmpty(queryHistories)) { - long maxId = queryHistoryDAO.getQueryHistoryMaxId(project); - resetIdOffset(maxId); - } - } - - private void resetIdOffset(long maxId) { - EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { - KylinConfig config = KylinConfig.getInstanceFromEnv(); - QueryHistoryIdOffsetManager manager = QueryHistoryIdOffsetManager.getInstance(config, project); - QueryHistoryIdOffset queryHistoryIdOffset = manager.get(); - if (queryHistoryIdOffset.getOffset() > maxId - || queryHistoryIdOffset.getStatMetaUpdateOffset() > maxId) { - queryHistoryIdOffset.setOffset(maxId); - queryHistoryIdOffset.setStatMetaUpdateOffset(maxId); - manager.save(queryHistoryIdOffset); - } - needResetOffset = false; - return 0; - }, project); - } - public void batchHandle(int batchSize, int maxSize, Consumer<List<QueryHistory>> consumer) { if (!(batchSize > 0 && maxSize >= batchSize)) { throw new IllegalArgumentException(String.format(Locale.ROOT, diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java index 9cff224d88..9e70136557 100644 --- a/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java @@ -150,9 +150,7 @@ public class QueryHistoryTaskSchedulerTest extends NLocalFileMetadataTestCase { idOffsetManager.save(queryHistoryIdOffset); Assert.assertEquals(999L, idOffsetManager.get().getOffset()); // run update - QueryHistoryTaskScheduler.QueryHistoryAccelerateRunner queryHistoryAccelerateRunner = // - qhAccelerateScheduler.new QueryHistoryAccelerateRunner(false); - queryHistoryAccelerateRunner.run(); + qhAccelerateScheduler.resetOffsetId(); // after auto reset offset Assert.assertEquals(0L, idOffsetManager.get().getOffset()); } @@ -173,10 +171,7 @@ public class QueryHistoryTaskSchedulerTest extends NLocalFileMetadataTestCase { Assert.assertEquals(9L, idOffsetManager.get().getOffset()); // run update - QueryHistoryTaskScheduler.QueryHistoryAccelerateRunner queryHistoryAccelerateRunner = // - qhAccelerateScheduler.new QueryHistoryAccelerateRunner(false); - queryHistoryAccelerateRunner.run(); - // after auto reset offset + qhAccelerateScheduler.resetOffsetId(); Assert.assertEquals(9L, idOffsetManager.get().getOffset()); } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java index 62dfdbec88..d11884abfd 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java @@ -72,16 +72,18 @@ public class EpochOrchestrator { // first renew and update epoch at org.apache.kylin.rest.discovery.KylinServiceDiscoveryCache#createServiceCache long pollSecond = kylinConfig.getEpochCheckerIntervalSecond(); logger.info("Try to update/renew epoch every {} seconds", pollSecond); + EpochChecker epochChecker = new EpochChecker(); + epochChecker.run(); if (!kylinConfig.getEpochCheckerEnabled()) { // this logic can be used when there is only one All or Job KE node logger.info("Disable epoch timing renew, renew epoch only once"); checkerPool = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("EpochChecker")); - checkerPool.scheduleWithFixedDelay(new EpochChecker(), 1, pollSecond, TimeUnit.SECONDS); + checkerPool.scheduleWithFixedDelay(epochChecker, pollSecond + 1, pollSecond, TimeUnit.SECONDS); return; } logger.info("Renew executor work size is :{}", kylinConfig.getRenewEpochWorkerPoolSize()); checkerPool = Executors.newScheduledThreadPool(2, new NamedThreadFactory("EpochChecker")); - checkerPool.scheduleWithFixedDelay(new EpochChecker(), 1, pollSecond, TimeUnit.SECONDS); + checkerPool.scheduleWithFixedDelay(epochChecker, pollSecond + 1, pollSecond, TimeUnit.SECONDS); checkerPool.scheduleAtFixedRate(new EpochRenewer(), pollSecond, pollSecond, TimeUnit.SECONDS); }