This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit fd95ac41cffc70441c50eef70b8ca0a462eca760 Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Wed Dec 7 10:44:53 2022 +0800 KYLIN-5433 Add system level parameter, and the storage quota function is turned off by default --- .../kylin/rest/controller/NAdminController.java | 1 + .../org/apache/kylin/common/KylinConfigBase.java | 8 ++--- .../apache/kylin/common/KylinConfigBaseTest.java | 8 +++++ .../job/impl/threadpool/NDefaultScheduler.java | 2 +- .../job/impl/threadpool/NDefaultSchedulerTest.java | 2 +- .../cube/storage/GarbageStorageCollector.java | 2 +- .../cube/storage/StorageInfoCollector.java | 8 ++++- .../cube/storage/StorageQuotaCollector.java | 2 +- .../cube/storage/TotalStorageCollector.java | 2 +- .../apache/kylin/metrics/HdfsCapacityMetrics.java | 38 +++++++++++----------- .../storage/ProjectStorageInfoCollectorTest.java | 16 +++++++++ .../kylin/metrics/HdfsCapacityMetricsTest.java | 26 +++++++++------ .../kylin/rest/service/ProjectServiceTest.java | 1 + 13 files changed, 77 insertions(+), 39 deletions(-) diff --git a/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java b/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java index d134bf04b9..42ece3badc 100644 --- a/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java +++ b/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java @@ -75,6 +75,7 @@ public class NAdminController extends NBasicController { propertyKeys.add("kylin.model.measure-name-check-enabled"); propertyKeys.add("kylin.security.remove-ldap-custom-security-limit-enabled"); propertyKeys.add("kylin.source.ddl.enabled"); + propertyKeys.add("kylin.storage.check-quota-enabled"); // add second storage if (StringUtils.isNotEmpty(KylinConfig.getInstanceFromEnv().getSecondStorage())) { diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 61b8185569..bbff8f22e8 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1208,10 +1208,6 @@ public abstract class KylinConfigBase implements Serializable { } - public boolean isCheckQuotaStorageEnabled() { - return Boolean.parseBoolean(getOptional("kylin.job.check-quota-storage-enabled", TRUE)); - } - public boolean isDeleteJobTmpWhenRetry() { return Boolean.parseBoolean(getOptional("kylin.job.delete-job-tmp-when-retry", FALSE)); } @@ -3746,6 +3742,10 @@ public abstract class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.project-merge-bloat-threshold", "0")); } + public boolean isStorageQuotaEnabled() { + return Boolean.parseBoolean(getOptional("kylin.storage.check-quota-enabled", FALSE)); + } + public boolean skipShardPruningForInExpr() { return Boolean.parseBoolean(getOptional("kylin.query.skip-shard-pruning-for-in", FALSE)); } diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java index d037fea7f7..a241b73b4c 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java @@ -1392,6 +1392,14 @@ class KylinConfigBaseTest { config.setProperty("kylin.build.segment-overlap-enabled", "true"); assertTrue(config.isBuildSegmentOverlapEnabled()); } + + @Test + void testIsQuotaStorageEnabled() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + assertFalse(config.isStorageQuotaEnabled()); + config.setProperty("kylin.storage.check-quota-enabled", "true"); + assertTrue(config.isStorageQuotaEnabled()); + } } class EnvironmentUpdateUtils { diff --git a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java index 8be23b4c19..bcac0074f6 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java @@ -179,7 +179,7 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> { logger.info("Fetching jobs every {} seconds", pollSecond); val fetcher = new FetcherRunner(this, jobPool, fetcherPool); - if (config.isCheckQuotaStorageEnabled()) { + if (config.isStorageQuotaEnabled()) { fetcherPool.scheduleWithFixedDelay(new QuotaStorageCheckRunner(this), RandomUtils.nextInt(0, pollSecond), pollSecond, TimeUnit.SECONDS); } diff --git a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java index 69dd9f14f8..1ed40dbbd8 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java @@ -111,7 +111,7 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest { public void setup() throws Exception { overwriteSystemProp("kylin.job.auto-set-concurrent-jobs", "true"); overwriteSystemProp("kylin.env", "UT"); - overwriteSystemProp("kylin.job.check-quota-storage-enabled", "true"); + overwriteSystemProp("kylin.storage.check-quota-enabled", "true"); super.setup(); } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java index 5fb6ecc8ff..b1e0856197 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java @@ -42,7 +42,7 @@ import lombok.extern.slf4j.Slf4j; public class GarbageStorageCollector implements StorageInfoCollector { @Override - public void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) { + public void doCollect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) { Map<String, Set<Long>> garbageIndexMap = Maps.newHashMap(); long storageSize = 0L; diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java index 4218244b06..73c63f13ac 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java @@ -24,6 +24,12 @@ import org.apache.kylin.common.KylinConfig; public interface StorageInfoCollector { - void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) throws IOException; + default void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) throws IOException { + if (!config.isStorageQuotaEnabled()) { + return; + } + doCollect(config, project, storageVolumeInfo); + } + void doCollect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) throws IOException; } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java index 951f896ef6..8b04f05164 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java @@ -26,7 +26,7 @@ import org.apache.kylin.metadata.project.NProjectManager; public class StorageQuotaCollector implements StorageInfoCollector { @Override - public void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) { + public void doCollect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) { config = NProjectManager.getInstance(config).getProject(project).getConfig(); long quotaSize = config.getStorageQuotaSize(); storageVolumeInfo.setStorageQuotaSize(quotaSize); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java index 02f5f4213f..3250f40cbf 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java @@ -31,7 +31,7 @@ import org.apache.kylin.metrics.HdfsCapacityMetrics; public class TotalStorageCollector implements StorageInfoCollector { @Override - public void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) throws IOException { + public void doCollect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) throws IOException { long totalStorageSize = HdfsCapacityMetrics.getHdfsCapacityByProject(project); if (totalStorageSize != -1L) { log.info("Reuse workingDirCapacity by project {}, storageSize: {}", project, totalStorageSize); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metrics/HdfsCapacityMetrics.java b/src/core-metadata/src/main/java/org/apache/kylin/metrics/HdfsCapacityMetrics.java index 236fb5c556..6cb031ffc2 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metrics/HdfsCapacityMetrics.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metrics/HdfsCapacityMetrics.java @@ -18,15 +18,7 @@ package org.apache.kylin.metrics; -import java.io.IOException; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - +import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -40,7 +32,14 @@ import org.apache.kylin.common.util.NamedThreadFactory; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; -import lombok.extern.slf4j.Slf4j; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * 1. Unify the entry point for all calculation calls to obtain the capacity of the WorkingDir through scheduled threads @@ -57,6 +56,7 @@ public class HdfsCapacityMetrics { protected static final FileSystem WORKING_FS; protected static final ScheduledExecutorService HDFS_METRICS_SCHEDULED_EXECUTOR; protected static boolean hdfsMetricsPeriodicCalculationEnabled; + protected static boolean quotaStorageEnabled; // For all places that need to query WorkingDir capacity for retrieval, initialize to avoid NPE protected static ConcurrentMap<String, Long> workingDirCapacity = new ConcurrentHashMap<>(); // Used to clear the existing workingDirCapacity in memory, you cannot use the clear method for workingDirCapacity @@ -69,8 +69,7 @@ public class HdfsCapacityMetrics { SERVICE_INFO = AddressUtil.getLocalInstance(); WORKING_FS = HadoopUtil.getWorkingFileSystem(); HDFS_CAPACITY_METRICS_PATH = new Path(KYLIN_CONFIG.getHdfsMetricsDir("hdfsCapacity.json")); - HDFS_METRICS_SCHEDULED_EXECUTOR = Executors.newScheduledThreadPool(1, - new NamedThreadFactory("HdfsMetricsChecker")); + HDFS_METRICS_SCHEDULED_EXECUTOR = Executors.newScheduledThreadPool(1, new NamedThreadFactory("HdfsMetricsChecker")); registerHdfsMetrics(); } @@ -85,10 +84,11 @@ public class HdfsCapacityMetrics { // 3. Junk cleanup: theoretically the file will not be very large, do not need to consider cleaning up for the time // being, cleaning will affect the recalculation of the directory involved hdfsMetricsPeriodicCalculationEnabled = KYLIN_CONFIG.isHdfsMetricsPeriodicCalculationEnabled(); - if (hdfsMetricsPeriodicCalculationEnabled) { - log.info("HDFS metrics periodic calculation is enabled, path: {}", HDFS_CAPACITY_METRICS_PATH); - HDFS_METRICS_SCHEDULED_EXECUTOR.scheduleAtFixedRate(HdfsCapacityMetrics::handleNodeHdfsMetrics, 0, - KYLIN_CONFIG.getHdfsMetricsPeriodicCalculationInterval(), TimeUnit.MILLISECONDS); + quotaStorageEnabled = KYLIN_CONFIG.isStorageQuotaEnabled(); + if (quotaStorageEnabled && hdfsMetricsPeriodicCalculationEnabled) { + log.info("Quota storage and HDFS metrics periodic calculation are enabled, path: {}", HDFS_CAPACITY_METRICS_PATH); + HDFS_METRICS_SCHEDULED_EXECUTOR.scheduleAtFixedRate(HdfsCapacityMetrics::handleNodeHdfsMetrics, + 0, KYLIN_CONFIG.getHdfsMetricsPeriodicCalculationInterval(), TimeUnit.MILLISECONDS); } } @@ -106,8 +106,8 @@ public class HdfsCapacityMetrics { public static void writeHdfsMetrics() { prepareForWorkingDirCapacity.clear(); // All WorkingDir capacities involved are calculated here - Set<String> allProjects = NProjectManager.getInstance(KYLIN_CONFIG).listAllProjects().stream() - .map(ProjectInstance::getName).collect(Collectors.toSet()); + Set<String> allProjects = NProjectManager.getInstance(KYLIN_CONFIG).listAllProjects() + .stream().map(ProjectInstance::getName).collect(Collectors.toSet()); try { for (String project : allProjects) { // Should not initialize projectTotalStorageSize outside the loop, otherwise it may affect the next calculation @@ -161,4 +161,4 @@ public class HdfsCapacityMetrics { } return -1L; } -} +} \ No newline at end of file diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollectorTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollectorTest.java index 2370c67714..6dd8df4aa6 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollectorTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollectorTest.java @@ -90,6 +90,7 @@ public class ProjectStorageInfoCollectorTest extends NLocalFileMetadataTestCase @Test public void testGetStorageVolumeInfo() { + overwriteSystemProp("kylin.storage.check-quota-enabled", "true"); getTestConfig().setProperty("kylin.metadata.semi-automatic-mode", "true"); initTestData(); @@ -163,6 +164,7 @@ public class ProjectStorageInfoCollectorTest extends NLocalFileMetadataTestCase @Test public void testLowFreqStrategyOfFreqTimeWindow() { + overwriteSystemProp("kylin.storage.check-quota-enabled", "true"); getTestConfig().setProperty("kylin.metadata.semi-automatic-mode", "true"); initTestData(); val collector = new ProjectStorageInfoCollector(Lists.newArrayList(StorageInfoEnum.GARBAGE_STORAGE)); @@ -174,6 +176,7 @@ public class ProjectStorageInfoCollectorTest extends NLocalFileMetadataTestCase @Test public void testLowFreqStrategyOfLowFreqStrategyThreshold() { + overwriteSystemProp("kylin.storage.check-quota-enabled", "true"); getTestConfig().setProperty("kylin.metadata.semi-automatic-mode", "true"); initTestData(); val collector = new ProjectStorageInfoCollector(Lists.newArrayList(StorageInfoEnum.GARBAGE_STORAGE)); @@ -459,6 +462,7 @@ public class ProjectStorageInfoCollectorTest extends NLocalFileMetadataTestCase @Test public void testGetStorageVolumeWithOutHdfsCapacityMetrics() throws IOException { + overwriteSystemProp("kylin.storage.check-quota-enabled", "true"); KylinConfig testConfig = getTestConfig(); overwriteSystemProp("kylin.metrics.hdfs-periodic-calculation-enabled", "false"); StorageVolumeInfo storageVolumeInfo = Mockito.spy(StorageVolumeInfo.class); @@ -469,6 +473,7 @@ public class ProjectStorageInfoCollectorTest extends NLocalFileMetadataTestCase @Test public void testGetStorageVolumeWithHdfsCapacityMetrics() throws IOException { + overwriteSystemProp("kylin.storage.check-quota-enabled", "true"); KylinConfig testConfig = getTestConfig(); overwriteSystemProp("kylin.metrics.hdfs-periodic-calculation-enabled", "true"); HdfsCapacityMetrics.registerHdfsMetrics(); @@ -478,4 +483,15 @@ public class ProjectStorageInfoCollectorTest extends NLocalFileMetadataTestCase Assert.assertEquals(0, storageVolumeInfo.getTotalStorageSize()); } + @Test + public void testGetStorageVolumeQuotaStorageEnabledFalse() throws IOException { + overwriteSystemProp("kylin.storage.check-quota-enabled", "false"); + KylinConfig testConfig = getTestConfig(); + HdfsCapacityMetrics.registerHdfsMetrics(); + StorageVolumeInfo storageVolumeInfo = Mockito.spy(StorageVolumeInfo.class); + TotalStorageCollector totalStorageCollector = new TotalStorageCollector(); + totalStorageCollector.collect(testConfig, DEFAULT_PROJECT, storageVolumeInfo); + Assert.assertEquals(-1, storageVolumeInfo.getTotalStorageSize()); + } + } diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metrics/HdfsCapacityMetricsTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metrics/HdfsCapacityMetricsTest.java index 3613eab893..9239ff9093 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metrics/HdfsCapacityMetricsTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metrics/HdfsCapacityMetricsTest.java @@ -50,6 +50,7 @@ public class HdfsCapacityMetricsTest extends NLocalFileMetadataTestCase { @Test public void testRegisterHdfsMetricsFailed() { + overwriteSystemProp("kylin.storage.check-quota-enabled", "true"); HdfsCapacityMetrics.registerHdfsMetrics(); // scheduledExecutor may like this // java.util.concurrent.ScheduledThreadPoolExecutor@5bf61e67[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0] @@ -64,6 +65,7 @@ public class HdfsCapacityMetricsTest extends NLocalFileMetadataTestCase { @Test @Ignore("KE-40537") public void testRegisterHdfsMetrics() { + overwriteSystemProp("kylin.storage.check-quota-enabled", "true"); overwriteSystemProp("kylin.metrics.hdfs-periodic-calculation-enabled", "true"); HdfsCapacityMetrics.registerHdfsMetrics(); // scheduledExecutor may like this @@ -73,15 +75,19 @@ public class HdfsCapacityMetricsTest extends NLocalFileMetadataTestCase { int activeThreadIdx = scheduledExecutor.indexOf(activeThreadStr); String thread = scheduledExecutor.substring(activeThreadIdx + activeThreadStr.length(), activeThreadIdx + activeThreadStr.length() + 1); - if (Integer.parseInt(thread) != 0) { - Assert.assertEquals(1, Integer.parseInt(thread)); - } else { - String queuedThreadStr = "queued tasks = "; - int queuedThreadIdx = scheduledExecutor.indexOf(queuedThreadStr); - int queued = Integer.parseInt(scheduledExecutor.substring(queuedThreadIdx + queuedThreadStr.length(), - queuedThreadIdx + queuedThreadStr.length() + 1)); - Assert.assertTrue(1 <= queued); - } + Assert.assertEquals(1, Integer.parseInt(thread)); + } + + @Test + public void testRegisterHdfsMetricsQuotaStorageEnabledFalse() { + overwriteSystemProp("kylin.storage.check-quota-enabled", "false"); + HdfsCapacityMetrics.registerHdfsMetrics(); + String scheduledExecutor = HdfsCapacityMetrics.HDFS_METRICS_SCHEDULED_EXECUTOR.toString(); + String activeThreadStr = "active threads = "; + int activeThreadIdx = scheduledExecutor.indexOf(activeThreadStr); + String thread = scheduledExecutor.substring(activeThreadIdx + activeThreadStr.length(), + activeThreadIdx + activeThreadStr.length() + 1); + Assert.assertEquals(0, Integer.parseInt(thread)); } @Test @@ -147,4 +153,4 @@ public class HdfsCapacityMetricsTest extends NLocalFileMetadataTestCase { overwriteSystemProp("kylin.metrics.hdfs-periodic-calculation-enabled", "true"); Assert.assertEquals(0L, (long) HdfsCapacityMetrics.getHdfsCapacityByProject("kylin")); } -} +} \ No newline at end of file diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java index 62ded88b63..70936d305e 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java @@ -304,6 +304,7 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase { @Test public void testGetStorageVolumeInfoResponse() { + overwriteSystemProp("kylin.storage.check-quota-enabled", "true"); getTestConfig().setProperty("kylin.metadata.semi-automatic-mode", "true"); prepareLayoutHitCount(); String error = "do not use aclEvalute in getStorageVolumeInfoResponse, because backend thread would invoke this method in (BootstrapCommand.class)";