This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit db02b7f93b74b52662466983660bfda59d0a5dd9
Author: luoping.zhang <luoping.zh...@kyligence.io>
AuthorDate: Fri May 12 17:23:18 2023 +0800

    KYLIN-5690 Optimize the Kylin read metadata for maxId
---
 .../apache/kylin/rest/config/AppInitializer.java   | 18 +++++++-
 .../service/task/QueryHistoryTaskScheduler.java    | 52 ++++++++--------------
 .../task/QueryHistoryTaskSchedulerTest.java        |  9 +---
 .../kylin/metadata/epoch/EpochOrchestrator.java    |  6 ++-
 4 files changed, 42 insertions(+), 43 deletions(-)

diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
index f36aa23f26..9f83f39451 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
@@ -38,6 +38,7 @@ import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.common.util.HostInfoFetcher;
 import org.apache.kylin.engine.spark.filter.QueryFiltersCollector;
 import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils;
+import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.kylin.metadata.epoch.EpochOrchestrator;
 import org.apache.kylin.metadata.project.NProjectLoader;
 import org.apache.kylin.metadata.project.NProjectManager;
@@ -55,6 +56,7 @@ import 
org.apache.kylin.rest.config.initialize.SparderStartEvent;
 import org.apache.kylin.rest.config.initialize.TableSchemaChangeListener;
 import org.apache.kylin.rest.config.initialize.UserAclListener;
 import org.apache.kylin.rest.service.CommonQueryCacheSupporter;
+import org.apache.kylin.rest.service.task.QueryHistoryTaskScheduler;
 import org.apache.kylin.rest.util.JStackDumpTask;
 import org.apache.kylin.streaming.jobs.StreamingJobListener;
 import org.apache.kylin.tool.daemon.KapGuardianHATask;
@@ -175,12 +177,24 @@ public class AppInitializer {
         log.info("The system cache is warmed up.");
     }
 
+    private void resetProjectOffsetId() {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        List<ProjectInstance> prjInstances = 
NProjectManager.getInstance(kylinConfig).listAllProjects();
+        EpochManager epochManager = EpochManager.getInstance();
+        prjInstances.forEach(project -> {
+            if (epochManager.checkEpochOwner(project.getName())) {
+                
QueryHistoryTaskScheduler.getInstance(project.getName()).resetOffsetId();
+            }
+        });
+    }
+
     @EventListener(ApplicationReadyEvent.class)
     public void afterReady(ApplicationReadyEvent ignoredEvent) {
         val kylinConfig = KylinConfig.getInstanceFromEnv();
         setFsUrlStreamHandlerFactory();
         if (kylinConfig.isJobNode()) {
             new EpochOrchestrator(kylinConfig);
+            resetProjectOffsetId();
         }
         if (kylinConfig.getJStackDumpTaskEnabled()) {
             taskScheduler.scheduleAtFixedRate(new JStackDumpTask(),
@@ -193,7 +207,9 @@ public class AppInitializer {
                     kylinConfig.getGuardianHACheckInterval() * 
Constant.SECOND);
         }
 
-        taskScheduler.scheduleAtFixedRate(new 
ProjectSerialEventBus.TimingDispatcher(), 
ProjectSerialEventBus.TimingDispatcher.INTERVAL);
+        taskScheduler.scheduleAtFixedRate(new 
ProjectSerialEventBus.TimingDispatcher(),
+                ProjectSerialEventBus.TimingDispatcher.INTERVAL);
+
     }
 
     private void postInit() {
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
index 759b2db724..86e5d286ba 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
@@ -130,6 +130,21 @@ public class QueryHistoryTaskScheduler {
         log.info("Query history task scheduler is started for [{}] ", project);
     }
 
+    public void resetOffsetId() {
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            long maxId = queryHistoryDAO.getQueryHistoryMaxId(project);
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            QueryHistoryIdOffsetManager manager = 
QueryHistoryIdOffsetManager.getInstance(config, project);
+            QueryHistoryIdOffset queryHistoryIdOffset = manager.get();
+            if (queryHistoryIdOffset.getOffset() > maxId || 
queryHistoryIdOffset.getStatMetaUpdateOffset() > maxId) {
+                queryHistoryIdOffset.setOffset(maxId);
+                queryHistoryIdOffset.setStatMetaUpdateOffset(maxId);
+                manager.save(queryHistoryIdOffset);
+            }
+            return 0;
+        }, project);
+    }
+
     public Future scheduleImmediately(QueryHistoryTask runner) {
         return taskScheduler.schedule(runner, 10L, TimeUnit.SECONDS);
     }
@@ -174,10 +189,8 @@ public class QueryHistoryTaskScheduler {
         protected List<QueryHistory> getQueryHistories(int batchSize) {
             QueryHistoryIdOffsetManager qhIdOffsetManager = 
QueryHistoryIdOffsetManager
                     .getInstance(KylinConfig.getInstanceFromEnv(), project);
-            List<QueryHistory> queryHistoryList = 
queryHistoryDAO.queryQueryHistoriesByIdOffset(
-                    qhIdOffsetManager.get().getStatMetaUpdateOffset(), 
batchSize, project);
-            resetIdOffset(queryHistoryList);
-            return queryHistoryList;
+            return 
queryHistoryDAO.queryQueryHistoriesByIdOffset(qhIdOffsetManager.get().getStatMetaUpdateOffset(),
+                    batchSize, project);
         }
 
         @Override
@@ -350,10 +363,8 @@ public class QueryHistoryTaskScheduler {
         protected List<QueryHistory> getQueryHistories(int batchSize) {
             QueryHistoryIdOffsetManager qhIdOffsetManager = 
QueryHistoryIdOffsetManager
                     .getInstance(KylinConfig.getInstanceFromEnv(), project);
-            List<QueryHistory> queryHistoryList = queryHistoryDAO
-                    
.queryQueryHistoriesByIdOffset(qhIdOffsetManager.get().getOffset(), batchSize, 
project);
-            resetIdOffset(queryHistoryList);
-            return queryHistoryList;
+            return 
queryHistoryDAO.queryQueryHistoriesByIdOffset(qhIdOffsetManager.get().getOffset(),
 batchSize,
+                    project);
         }
 
         @Override
@@ -428,31 +439,6 @@ public class QueryHistoryTaskScheduler {
 
         protected abstract String name();
 
-        private volatile boolean needResetOffset = true;
-
-        protected void resetIdOffset(List<QueryHistory> queryHistories) {
-            if (needResetOffset && CollectionUtils.isEmpty(queryHistories)) {
-                long maxId = queryHistoryDAO.getQueryHistoryMaxId(project);
-                resetIdOffset(maxId);
-            }
-        }
-
-        private void resetIdOffset(long maxId) {
-            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-                KylinConfig config = KylinConfig.getInstanceFromEnv();
-                QueryHistoryIdOffsetManager manager = 
QueryHistoryIdOffsetManager.getInstance(config, project);
-                QueryHistoryIdOffset queryHistoryIdOffset = manager.get();
-                if (queryHistoryIdOffset.getOffset() > maxId
-                        || queryHistoryIdOffset.getStatMetaUpdateOffset() > 
maxId) {
-                    queryHistoryIdOffset.setOffset(maxId);
-                    queryHistoryIdOffset.setStatMetaUpdateOffset(maxId);
-                    manager.save(queryHistoryIdOffset);
-                }
-                needResetOffset = false;
-                return 0;
-            }, project);
-        }
-
         public void batchHandle(int batchSize, int maxSize, 
Consumer<List<QueryHistory>> consumer) {
             if (!(batchSize > 0 && maxSize >= batchSize)) {
                 throw new IllegalArgumentException(String.format(Locale.ROOT,
diff --git 
a/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
 
b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
index 9cff224d88..9e70136557 100644
--- 
a/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
+++ 
b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
@@ -150,9 +150,7 @@ public class QueryHistoryTaskSchedulerTest extends 
NLocalFileMetadataTestCase {
         idOffsetManager.save(queryHistoryIdOffset);
         Assert.assertEquals(999L, idOffsetManager.get().getOffset());
         // run update
-        QueryHistoryTaskScheduler.QueryHistoryAccelerateRunner 
queryHistoryAccelerateRunner = //
-                qhAccelerateScheduler.new QueryHistoryAccelerateRunner(false);
-        queryHistoryAccelerateRunner.run();
+        qhAccelerateScheduler.resetOffsetId();
         // after auto reset offset
         Assert.assertEquals(0L, idOffsetManager.get().getOffset());
     }
@@ -173,10 +171,7 @@ public class QueryHistoryTaskSchedulerTest extends 
NLocalFileMetadataTestCase {
         Assert.assertEquals(9L, idOffsetManager.get().getOffset());
 
         // run update
-        QueryHistoryTaskScheduler.QueryHistoryAccelerateRunner 
queryHistoryAccelerateRunner = //
-                qhAccelerateScheduler.new QueryHistoryAccelerateRunner(false);
-        queryHistoryAccelerateRunner.run();
-        // after auto reset offset
+        qhAccelerateScheduler.resetOffsetId();
         Assert.assertEquals(9L, idOffsetManager.get().getOffset());
     }
 
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java
index 62dfdbec88..d11884abfd 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java
@@ -72,16 +72,18 @@ public class EpochOrchestrator {
         // first renew and update epoch at 
org.apache.kylin.rest.discovery.KylinServiceDiscoveryCache#createServiceCache
         long pollSecond = kylinConfig.getEpochCheckerIntervalSecond();
         logger.info("Try to update/renew epoch every {} seconds", pollSecond);
+        EpochChecker epochChecker = new EpochChecker();
+        epochChecker.run();
         if (!kylinConfig.getEpochCheckerEnabled()) {
             // this logic can be used when there is only one All or Job KE node
             logger.info("Disable epoch timing renew, renew epoch only once");
             checkerPool = Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("EpochChecker"));
-            checkerPool.scheduleWithFixedDelay(new EpochChecker(), 1, 
pollSecond, TimeUnit.SECONDS);
+            checkerPool.scheduleWithFixedDelay(epochChecker, pollSecond + 1, 
pollSecond, TimeUnit.SECONDS);
             return;
         }
         logger.info("Renew executor work size is :{}", 
kylinConfig.getRenewEpochWorkerPoolSize());
         checkerPool = Executors.newScheduledThreadPool(2, new 
NamedThreadFactory("EpochChecker"));
-        checkerPool.scheduleWithFixedDelay(new EpochChecker(), 1, pollSecond, 
TimeUnit.SECONDS);
+        checkerPool.scheduleWithFixedDelay(epochChecker, pollSecond + 1, 
pollSecond, TimeUnit.SECONDS);
         checkerPool.scheduleAtFixedRate(new EpochRenewer(), pollSecond, 
pollSecond, TimeUnit.SECONDS);
     }
 

Reply via email to