This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 67ed3bed4e7551c240e00357e7b406d62df0b9c5 Author: qianhao.zhou <z.qian...@gmail.com> AuthorDate: Tue Jan 3 10:10:07 2023 +0800 KYLIN-5492 service modules refactor Co-authored-by: qhzhou <qianhao.z...@kyligence.io> --- src/common-service/pom.xml | 21 ++++++++-- .../apache/kylin/rest/config/AppInitializer.java | 4 +- .../initialize/DataSourceAppInitializer.java | 0 .../apache/kylin/rest/constant/JobInfoEnum.java | 0 .../kylin/rest/request/AWSTableLoadRequest.java | 0 .../kylin/rest/request/AutoMergeRequest.java | 0 .../org/apache/kylin/rest/request/DDLRequest.java | 0 .../kylin/rest/request/DateRangeRequest.java | 0 .../kylin/rest/request/ExportTableRequest.java | 0 .../kylin/rest/request/OpenReloadTableRequest.java | 0 .../kylin/rest/request/PartitionKeyRequest.java | 0 .../kylin/rest/request/ReloadTableRequest.java | 0 .../apache/kylin/rest/request/S3TableExtInfo.java | 0 .../apache/kylin/rest/request/SamplingRequest.java | 0 .../kylin/rest/request/StreamingRequest.java | 0 .../kylin/rest/request/StreamingTableRequest.java | 0 .../kylin/rest/request/TableDescRequest.java | 0 .../kylin/rest/request/TableLoadRequest.java | 0 .../apache/kylin/rest/request/TopTableRequest.java | 0 .../rest/request/UpdateAWSTableExtDescRequest.java | 0 .../org/apache/kylin/rest/request/ViewRequest.java | 0 .../rest/response/AutoMergeConfigResponse.java | 0 .../rest/response/BatchLoadTableResponse.java | 0 .../apache/kylin/rest/response/DDLResponse.java | 0 .../rest/response/ExistedDataRangeResponse.java | 0 .../kylin/rest/response/ExportTablesResponse.java | 0 .../kylin/rest/response/LoadTableResponse.java | 0 .../rest/response/NHiveTableNameResponse.java | 0 .../kylin/rest/response/NInitTablesResponse.java | 0 .../rest/response/OpenPreReloadTableResponse.java | 0 .../rest/response/PreReloadTableResponse.java | 0 .../rest/response/PreUnloadTableResponse.java | 0 .../kylin/rest/response/TableDescResponse.java | 0 .../kylin/rest/response/TableNameResponse.java | 0 .../apache/kylin/rest/response/TableRefresh.java | 0 .../kylin/rest/response/TableRefreshAll.java | 0 .../rest/response/TablesAndColumnsResponse.java | 0 .../response/UpdateAWSTableExtDescResponse.java | 0 .../kylin/rest/service/CustomFileService.java | 0 .../apache/kylin/rest/service/JobSupporter.java | 0 .../rest/service/TableFusionModelSupporter.java | 0 .../rest/service/TableIndexPlanSupporter.java | 0 .../kylin/rest/service/TableModelSupporter.java | 0 .../kylin/rest/service/TableSamplingSupporter.java | 0 .../apache/kylin/rest/source/DataSourceState.java | 0 .../apache/kylin/rest/source/NHiveSourceInfo.java | 0 .../org/apache/kylin/rest/util/TableUtils.java | 0 .../kylin/rest/service/CustomFileServiceTest.java | 0 .../apache/kylin/rest/service/ServiceTestBase.java | 8 ++-- .../apache/kylin/rest/service/SourceTestCase.java | 14 +++---- .../kylin/rest/source/DataSourceStateTest.java | 0 .../apache/kylin/loader/AddToClassPathAction.java | 0 .../org/apache/kylin/loader/ParserClassLoader.java | 0 .../kylin/loader/ParserClassLoaderState.java | 0 .../kylin/loader/utils/ClassLoaderUtils.java | 0 .../kylin/loader/AddToClassPathActionTest.java | 0 .../kylin/loader/ParserClassLoaderStateTest.java | 0 .../apache/kylin/engine/spark/ExecutableUtils.java | 36 ++++------------ src/data-loading-service/pom.xml | 2 +- .../rest/service/StreamingTableServiceTest.java | 4 +- .../rest/config/initialize/JobSchedulerTest.java | 46 ++++++++++---------- .../org/apache/kylin/event/ModelSemanticTest.java | 4 +- .../service/ModelServiceSemanticUpdateTest.java | 9 ++-- .../rest/service/LocalFileMetadataTestCase.java | 6 +-- .../kylin/rest/service/ModelServiceQueryTest.java | 4 +- .../newten/clickhouse/ClickHouseSimpleITTest.java | 4 +- .../kap/secondstorage/test/EnableScheduler.java | 4 +- .../service/ModelServiceWithSecondStorageTest.java | 4 +- .../spark/merger/AfterBuildResourceMerger.java | 4 +- .../merger/AfterMergeOrRefreshResourceMerger.java | 3 +- .../engine/spark/utils/SparkJobFactoryUtils.java | 49 ++++++++++++++++++++++ .../engine/spark/NLocalWithSparkSessionTest.java | 3 +- .../kylin/engine/spark/job/JobManagerTest.java | 4 +- 73 files changed, 143 insertions(+), 90 deletions(-) diff --git a/src/common-service/pom.xml b/src/common-service/pom.xml index e8f7040c16..219eaa1c87 100644 --- a/src/common-service/pom.xml +++ b/src/common-service/pom.xml @@ -115,6 +115,11 @@ <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.12</artifactId> + <scope>provided</scope> + </dependency> <dependency> @@ -189,12 +194,22 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.12</artifactId> - <scope>provided</scope> + <artifactId>spark-sql_2.12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.12</artifactId> + <artifactId>spark-hive_2.12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> <scope>test</scope> </dependency> </dependencies> diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java index cde0a43716..560e53c0a6 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java @@ -35,7 +35,7 @@ import org.apache.kylin.common.persistence.transaction.EventListenerRegistry; import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.common.util.HostInfoFetcher; -import org.apache.kylin.engine.spark.ExecutableUtils; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.metadata.epoch.EpochOrchestrator; import org.apache.kylin.metadata.project.NProjectLoader; import org.apache.kylin.metadata.project.NProjectManager; @@ -114,7 +114,7 @@ public class AppInitializer { EventBusFactory.getInstance().register(new ProcessStatusListener(), true); EventBusFactory.getInstance().register(new StreamingJobListener(), true); - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); } else { val auditLogStore = new JdbcAuditLogStore(kylinConfig); val epochStore = EpochStore.getEpochStore(kylinConfig); diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/config/initialize/DataSourceAppInitializer.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/DataSourceAppInitializer.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/config/initialize/DataSourceAppInitializer.java rename to src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/DataSourceAppInitializer.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/constant/JobInfoEnum.java b/src/common-service/src/main/java/org/apache/kylin/rest/constant/JobInfoEnum.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/constant/JobInfoEnum.java rename to src/common-service/src/main/java/org/apache/kylin/rest/constant/JobInfoEnum.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/AWSTableLoadRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/AWSTableLoadRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/AWSTableLoadRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/AWSTableLoadRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/AutoMergeRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/AutoMergeRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/AutoMergeRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/AutoMergeRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/DDLRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/DDLRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/DDLRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/DDLRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/DateRangeRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/DateRangeRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/DateRangeRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/DateRangeRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/ExportTableRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/ExportTableRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/ExportTableRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/ExportTableRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/OpenReloadTableRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/OpenReloadTableRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/OpenReloadTableRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/OpenReloadTableRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/PartitionKeyRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/PartitionKeyRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/PartitionKeyRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/PartitionKeyRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/ReloadTableRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/ReloadTableRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/ReloadTableRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/ReloadTableRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/S3TableExtInfo.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/S3TableExtInfo.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/S3TableExtInfo.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/S3TableExtInfo.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/SamplingRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/SamplingRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/SamplingRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/SamplingRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/StreamingTableRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/StreamingTableRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/StreamingTableRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/StreamingTableRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/TableDescRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/TableDescRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/TableDescRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/TableDescRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/TableLoadRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/TableLoadRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/TableLoadRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/TableLoadRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/TopTableRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/TopTableRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/TopTableRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/TopTableRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/UpdateAWSTableExtDescRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/UpdateAWSTableExtDescRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/UpdateAWSTableExtDescRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/UpdateAWSTableExtDescRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/ViewRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/ViewRequest.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/request/ViewRequest.java rename to src/common-service/src/main/java/org/apache/kylin/rest/request/ViewRequest.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/AutoMergeConfigResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/AutoMergeConfigResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/AutoMergeConfigResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/AutoMergeConfigResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/BatchLoadTableResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/BatchLoadTableResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/BatchLoadTableResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/BatchLoadTableResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/DDLResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/DDLResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/DDLResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/DDLResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/ExistedDataRangeResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/ExistedDataRangeResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/ExistedDataRangeResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/ExistedDataRangeResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/ExportTablesResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/ExportTablesResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/ExportTablesResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/ExportTablesResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/LoadTableResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/LoadTableResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/LoadTableResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/LoadTableResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/NHiveTableNameResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/NHiveTableNameResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/NHiveTableNameResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/NHiveTableNameResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/NInitTablesResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/NInitTablesResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/NInitTablesResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/NInitTablesResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/OpenPreReloadTableResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/OpenPreReloadTableResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/OpenPreReloadTableResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/OpenPreReloadTableResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/PreReloadTableResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/PreReloadTableResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/PreReloadTableResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/PreReloadTableResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/PreUnloadTableResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/PreUnloadTableResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/PreUnloadTableResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/PreUnloadTableResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/TableNameResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/TableNameResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/TableNameResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/TableNameResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/TableRefresh.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/TableRefresh.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/TableRefresh.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/TableRefresh.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/TableRefreshAll.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/TableRefreshAll.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/TableRefreshAll.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/TableRefreshAll.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/TablesAndColumnsResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/TablesAndColumnsResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/TablesAndColumnsResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/TablesAndColumnsResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/UpdateAWSTableExtDescResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/UpdateAWSTableExtDescResponse.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/response/UpdateAWSTableExtDescResponse.java rename to src/common-service/src/main/java/org/apache/kylin/rest/response/UpdateAWSTableExtDescResponse.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/CustomFileService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/CustomFileService.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/service/CustomFileService.java rename to src/common-service/src/main/java/org/apache/kylin/rest/service/CustomFileService.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/JobSupporter.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/JobSupporter.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/service/JobSupporter.java rename to src/common-service/src/main/java/org/apache/kylin/rest/service/JobSupporter.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/TableFusionModelSupporter.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/TableFusionModelSupporter.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/service/TableFusionModelSupporter.java rename to src/common-service/src/main/java/org/apache/kylin/rest/service/TableFusionModelSupporter.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/TableIndexPlanSupporter.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/TableIndexPlanSupporter.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/service/TableIndexPlanSupporter.java rename to src/common-service/src/main/java/org/apache/kylin/rest/service/TableIndexPlanSupporter.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/TableModelSupporter.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/TableModelSupporter.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/service/TableModelSupporter.java rename to src/common-service/src/main/java/org/apache/kylin/rest/service/TableModelSupporter.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/TableSamplingSupporter.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/TableSamplingSupporter.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/service/TableSamplingSupporter.java rename to src/common-service/src/main/java/org/apache/kylin/rest/service/TableSamplingSupporter.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/source/DataSourceState.java b/src/common-service/src/main/java/org/apache/kylin/rest/source/DataSourceState.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/source/DataSourceState.java rename to src/common-service/src/main/java/org/apache/kylin/rest/source/DataSourceState.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/source/NHiveSourceInfo.java b/src/common-service/src/main/java/org/apache/kylin/rest/source/NHiveSourceInfo.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/source/NHiveSourceInfo.java rename to src/common-service/src/main/java/org/apache/kylin/rest/source/NHiveSourceInfo.java diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/util/TableUtils.java b/src/common-service/src/main/java/org/apache/kylin/rest/util/TableUtils.java similarity index 100% rename from src/datasource-service/src/main/java/org/apache/kylin/rest/util/TableUtils.java rename to src/common-service/src/main/java/org/apache/kylin/rest/util/TableUtils.java diff --git a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/CustomFileServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/CustomFileServiceTest.java similarity index 100% rename from src/datasource-service/src/test/java/org/apache/kylin/rest/service/CustomFileServiceTest.java rename to src/common-service/src/test/java/org/apache/kylin/rest/service/CustomFileServiceTest.java diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java index 84d40981b2..0b26f1b49e 100644 --- a/src/common-service/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java @@ -21,10 +21,9 @@ package org.apache.kylin.rest.service; import java.util.Arrays; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; -import org.apache.kylin.engine.spark.ExecutableUtils; -import org.apache.kylin.metadata.user.ManagedUser; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; +import org.apache.kylin.rest.constant.Constant; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -49,6 +48,7 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.context.web.WebAppConfiguration; +import io.kyligence.kap.metadata.user.ManagedUser; import io.kyligence.kap.secondstorage.SecondStorageUpdater; @RunWith(SpringJUnit4ClassRunner.class) @@ -77,7 +77,7 @@ public class ServiceTestBase extends NLocalFileMetadataTestCase { @Before public void setup() { // init job factory - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); createTestMetadata(); KylinConfig config = KylinConfig.getInstanceFromEnv(); Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN); diff --git a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SourceTestCase.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/SourceTestCase.java similarity index 97% rename from src/datasource-service/src/test/java/org/apache/kylin/rest/service/SourceTestCase.java rename to src/common-service/src/test/java/org/apache/kylin/rest/service/SourceTestCase.java index 4766784808..d4ba9d53a9 100644 --- a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SourceTestCase.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/SourceTestCase.java @@ -24,25 +24,25 @@ import java.util.Comparator; import java.util.List; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.NExecutableManager; -import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.rest.constant.Constant; -import org.apache.kylin.source.jdbc.H2Database; -import org.apache.kylin.common.util.NLocalFileMetadataTestCase; -import org.apache.kylin.engine.spark.ExecutableUtils; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; -import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.source.jdbc.H2Database; import org.junit.After; import org.junit.Before; import org.springframework.security.authentication.TestingAuthenticationToken; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; +import io.kyligence.kap.metadata.epoch.EpochManager; import lombok.val; public class SourceTestCase extends NLocalFileMetadataTestCase { @@ -53,7 +53,7 @@ public class SourceTestCase extends NLocalFileMetadataTestCase { @Before public void setup() { - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); createTestMetadata(); KylinConfig config = KylinConfig.getInstanceFromEnv(); Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN); diff --git a/src/datasource-service/src/test/java/org/apache/kylin/rest/source/DataSourceStateTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/source/DataSourceStateTest.java similarity index 100% rename from src/datasource-service/src/test/java/org/apache/kylin/rest/source/DataSourceStateTest.java rename to src/common-service/src/test/java/org/apache/kylin/rest/source/DataSourceStateTest.java diff --git a/src/streaming/src/main/java/org/apache/kylin/loader/AddToClassPathAction.java b/src/core-common/src/main/java/org/apache/kylin/loader/AddToClassPathAction.java similarity index 100% rename from src/streaming/src/main/java/org/apache/kylin/loader/AddToClassPathAction.java rename to src/core-common/src/main/java/org/apache/kylin/loader/AddToClassPathAction.java diff --git a/src/streaming/src/main/java/org/apache/kylin/loader/ParserClassLoader.java b/src/core-common/src/main/java/org/apache/kylin/loader/ParserClassLoader.java similarity index 100% rename from src/streaming/src/main/java/org/apache/kylin/loader/ParserClassLoader.java rename to src/core-common/src/main/java/org/apache/kylin/loader/ParserClassLoader.java diff --git a/src/streaming/src/main/java/org/apache/kylin/loader/ParserClassLoaderState.java b/src/core-common/src/main/java/org/apache/kylin/loader/ParserClassLoaderState.java similarity index 100% rename from src/streaming/src/main/java/org/apache/kylin/loader/ParserClassLoaderState.java rename to src/core-common/src/main/java/org/apache/kylin/loader/ParserClassLoaderState.java diff --git a/src/streaming/src/main/java/org/apache/kylin/loader/utils/ClassLoaderUtils.java b/src/core-common/src/main/java/org/apache/kylin/loader/utils/ClassLoaderUtils.java similarity index 100% rename from src/streaming/src/main/java/org/apache/kylin/loader/utils/ClassLoaderUtils.java rename to src/core-common/src/main/java/org/apache/kylin/loader/utils/ClassLoaderUtils.java diff --git a/src/streaming/src/test/java/org/apache/kylin/loader/AddToClassPathActionTest.java b/src/core-common/src/test/java/org/apache/kylin/loader/AddToClassPathActionTest.java similarity index 100% rename from src/streaming/src/test/java/org/apache/kylin/loader/AddToClassPathActionTest.java rename to src/core-common/src/test/java/org/apache/kylin/loader/AddToClassPathActionTest.java diff --git a/src/streaming/src/test/java/org/apache/kylin/loader/ParserClassLoaderStateTest.java b/src/core-common/src/test/java/org/apache/kylin/loader/ParserClassLoaderStateTest.java similarity index 100% rename from src/streaming/src/test/java/org/apache/kylin/loader/ParserClassLoaderStateTest.java rename to src/core-common/src/test/java/org/apache/kylin/loader/ParserClassLoaderStateTest.java diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/ExecutableUtils.java b/src/core-job/src/main/java/org/apache/kylin/engine/spark/ExecutableUtils.java similarity index 71% rename from src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/ExecutableUtils.java rename to src/core-job/src/main/java/org/apache/kylin/engine/spark/ExecutableUtils.java index 5d37430faa..11636e770f 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/ExecutableUtils.java +++ b/src/core-job/src/main/java/org/apache/kylin/engine/spark/ExecutableUtils.java @@ -17,11 +17,11 @@ */ package org.apache.kylin.engine.spark; -import org.apache.kylin.engine.spark.job.NSparkCubingJob; -import org.apache.kylin.engine.spark.job.NSparkCubingStep; -import org.apache.kylin.engine.spark.job.NSparkMergingJob; -import org.apache.kylin.engine.spark.job.NSparkSnapshotJob; -import lombok.val; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -29,17 +29,14 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.metadata.cube.model.NBatchConstants; -import java.util.HashSet; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; - public class ExecutableUtils { + private ExecutableUtils() {} + public static ResourceStore getRemoteStore(KylinConfig config, AbstractExecutable buildTask) { - val buildStepUrl = buildTask.getParam(NBatchConstants.P_OUTPUT_META_URL); + String buildStepUrl = buildTask.getParam(NBatchConstants.P_OUTPUT_META_URL); - val buildConfig = KylinConfig.createKylinConfig(config); + KylinConfig buildConfig = KylinConfig.createKylinConfig(config); buildConfig.setMetadataUrl(buildStepUrl); return ResourceStore.getKylinMetaStore(buildConfig); } @@ -66,19 +63,4 @@ public class ExecutableUtils { return buildTask.getTargetPartitions(); } - public static boolean needBuildSnapshots(AbstractExecutable buildTask) { - if (buildTask instanceof NSparkCubingStep) { - String p = buildTask.getParam(NBatchConstants.P_NEED_BUILD_SNAPSHOTS); - return StringUtils.isBlank(p) || Boolean.parseBoolean(p); - } else { - return false; - } - } - - public static void initJobFactory() { - // register jobFactory in static function - new NSparkCubingJob(); - new NSparkMergingJob(); - new NSparkSnapshotJob(); - } } diff --git a/src/data-loading-service/pom.xml b/src/data-loading-service/pom.xml index aab8c449be..1895c8e45c 100644 --- a/src/data-loading-service/pom.xml +++ b/src/data-loading-service/pom.xml @@ -45,7 +45,7 @@ </dependency> <dependency> <groupId>org.apache.kylin</groupId> - <artifactId>kylin-datasource-service</artifactId> + <artifactId>kylin-common-service</artifactId> <type>test-jar</type> <scope>test</scope> </dependency> diff --git a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/StreamingTableServiceTest.java b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/StreamingTableServiceTest.java index a5f92508b0..2d65b47f59 100644 --- a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/StreamingTableServiceTest.java +++ b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/StreamingTableServiceTest.java @@ -29,7 +29,7 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; -import org.apache.kylin.engine.spark.ExecutableUtils; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.junit.rule.TransactionExceptedException; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.ColumnDesc; @@ -94,7 +94,7 @@ public class StreamingTableServiceTest extends NLocalFileMetadataTestCase { @Before public void setup() { - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); createTestMetadata(); Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN); SecurityContextHolder.getContext().setAuthentication(authentication); diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/config/initialize/JobSchedulerTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/config/initialize/JobSchedulerTest.java index 326d0be149..10d392434b 100644 --- a/src/job-service/src/test/java/org/apache/kylin/rest/config/initialize/JobSchedulerTest.java +++ b/src/job-service/src/test/java/org/apache/kylin/rest/config/initialize/JobSchedulerTest.java @@ -18,22 +18,24 @@ package org.apache.kylin.rest.config.initialize; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.apache.kylin.engine.spark.job.ExecutableAddCuboidHandler; -import org.apache.kylin.engine.spark.job.ExecutableAddSegmentHandler; -import org.apache.kylin.engine.spark.job.ExecutableMergeOrRefreshHandler; -import org.apache.kylin.engine.spark.job.NSparkCubingJob; -import org.apache.kylin.engine.spark.job.NSparkMergingJob; -import lombok.extern.slf4j.Slf4j; -import lombok.val; -import lombok.var; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_FAIL; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_INDEX_FAIL; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_SEGMENT_FAIL; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_EXCEPTION; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_REFRESH_CHECK_INDEX_FAIL; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; -import org.apache.kylin.engine.spark.ExecutableUtils; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.NExecutableManager; @@ -56,17 +58,17 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; -import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_FAIL; -import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_INDEX_FAIL; -import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_SEGMENT_FAIL; -import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_EXCEPTION; -import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_REFRESH_CHECK_INDEX_FAIL; +import io.kyligence.kap.engine.spark.job.ExecutableAddCuboidHandler; +import io.kyligence.kap.engine.spark.job.ExecutableAddSegmentHandler; +import io.kyligence.kap.engine.spark.job.ExecutableMergeOrRefreshHandler; +import io.kyligence.kap.engine.spark.job.NSparkCubingJob; +import io.kyligence.kap.engine.spark.job.NSparkMergingJob; +import lombok.val; +import lombok.var; +import lombok.extern.slf4j.Slf4j; @Slf4j public class JobSchedulerTest extends NLocalFileMetadataTestCase { @@ -80,7 +82,7 @@ public class JobSchedulerTest extends NLocalFileMetadataTestCase { @Before public void setup() { - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); createTestMetadata(); prepareSegment(); startScheduler(); diff --git a/src/kylin-it/src/test/java/org/apache/kylin/event/ModelSemanticTest.java b/src/kylin-it/src/test/java/org/apache/kylin/event/ModelSemanticTest.java index 6205165e12..2dbf10ab87 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/event/ModelSemanticTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/event/ModelSemanticTest.java @@ -29,7 +29,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.common.util.TempMetadataBuilder; -import org.apache.kylin.engine.spark.ExecutableUtils; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.NExecutableManager; import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; @@ -89,7 +89,7 @@ public class ModelSemanticTest extends AbstractMVCIntegrationTestCase { @BeforeClass public static void beforeClass() { - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); if (Shell.MAC) overwriteSystemPropBeforeClass("org.xerial.snappy.lib.name", "libsnappyjava.jnilib");//for snappy diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java index f0ddc80d02..8ceca79b41 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java @@ -39,9 +39,7 @@ import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.cube.model.SelectRule; -import org.apache.kylin.engine.spark.ExecutableUtils; -import org.apache.kylin.engine.spark.job.ExecutableAddCuboidHandler; -import org.apache.kylin.engine.spark.job.NSparkCubingJob; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.NExecutableManager; @@ -98,6 +96,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import io.kyligence.kap.engine.spark.job.ExecutableAddCuboidHandler; +import io.kyligence.kap.engine.spark.job.NSparkCubingJob; +import io.kyligence.kap.metadata.recommendation.candidate.JdbcRawRecStore; import lombok.val; import lombok.var; import lombok.extern.slf4j.Slf4j; @@ -150,7 +151,7 @@ public class ModelServiceSemanticUpdateTest extends NLocalFileMetadataTestCase { @Before public void setup() { - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); SecurityContextHolder.getContext() .setAuthentication(new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN)); ReflectionTestUtils.setField(aclEvaluate, "aclUtil", Mockito.spy(AclUtil.class)); diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/LocalFileMetadataTestCase.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/LocalFileMetadataTestCase.java index 59a05ce246..dfcb33b83d 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/LocalFileMetadataTestCase.java +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/LocalFileMetadataTestCase.java @@ -20,11 +20,11 @@ package org.apache.kylin.rest.service; import java.util.List; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.NExecutableManager; -import org.apache.kylin.common.util.NLocalFileMetadataTestCase; -import org.apache.kylin.engine.spark.ExecutableUtils; import org.junit.Before; import lombok.val; @@ -36,7 +36,7 @@ public class LocalFileMetadataTestCase extends NLocalFileMetadataTestCase { @Before public void setup() { - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); } protected List<AbstractExecutable> getRunningExecutables(String project, String model) { diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/ModelServiceQueryTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/ModelServiceQueryTest.java index 3a147a4ba8..2d9e81b01b 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/ModelServiceQueryTest.java +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/ModelServiceQueryTest.java @@ -27,8 +27,8 @@ import java.util.Arrays; import java.util.List; import org.apache.kylin.common.scheduler.EventBusFactory; -import org.apache.kylin.engine.spark.ExecutableUtils; import org.apache.kylin.engine.spark.utils.ComputedColumnEvalUtil; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.junit.rule.TransactionExceptedException; import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; @@ -139,7 +139,7 @@ public class ModelServiceQueryTest extends SourceTestCase { EventBusFactory.getInstance().register(eventListener, true); EventBusFactory.getInstance().register(modelBrokenListener, false); - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); } @After diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseSimpleITTest.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseSimpleITTest.java index 5de8ff0aa2..f8ba1dba4d 100644 --- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseSimpleITTest.java +++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseSimpleITTest.java @@ -42,9 +42,9 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.Unsafe; -import org.apache.kylin.engine.spark.ExecutableUtils; import org.apache.kylin.engine.spark.IndexDataConstructor; import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.job.SecondStorageJobParamUtil; import org.apache.kylin.job.common.ExecutableUtil; import org.apache.kylin.job.engine.JobEngineConfig; @@ -202,7 +202,7 @@ public class ClickHouseSimpleITTest extends NLocalWithSparkSessionTest implement secondStorageEndpoint.setModelService(modelService); openSecondStorageEndpoint.setSecondStorageEndpoint(secondStorageEndpoint); prepareMeta(); - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); doSetup(); diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/test/EnableScheduler.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/test/EnableScheduler.java index 63a1cd1416..b57a4b6f04 100644 --- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/test/EnableScheduler.java +++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/test/EnableScheduler.java @@ -18,7 +18,7 @@ package io.kyligence.kap.secondstorage.test; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.engine.spark.ExecutableUtils; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; @@ -31,7 +31,7 @@ public class EnableScheduler extends EnableLocalMeta { @Override protected void before() throws Throwable { super.before(); - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1"); NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project); scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv())); diff --git a/src/second-storage/core-ui/src/test/java/org/apache/kylin/rest/service/ModelServiceWithSecondStorageTest.java b/src/second-storage/core-ui/src/test/java/org/apache/kylin/rest/service/ModelServiceWithSecondStorageTest.java index 787b16f4c6..9ef91ef98d 100644 --- a/src/second-storage/core-ui/src/test/java/org/apache/kylin/rest/service/ModelServiceWithSecondStorageTest.java +++ b/src/second-storage/core-ui/src/test/java/org/apache/kylin/rest/service/ModelServiceWithSecondStorageTest.java @@ -27,8 +27,8 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; -import org.apache.kylin.engine.spark.ExecutableUtils; import org.apache.kylin.engine.spark.utils.ComputedColumnEvalUtil; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; import org.apache.kylin.metadata.model.ManagementType; import org.apache.kylin.metadata.model.NDataModel; @@ -166,7 +166,7 @@ public class ModelServiceWithSecondStorageTest extends NLocalFileMetadataTestCas } EventBusFactory.getInstance().register(eventListener, true); EventBusFactory.getInstance().register(modelBrokenListener, false); - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); } @After diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java index cfd0d474ad..f0582567f0 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java @@ -26,6 +26,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.engine.spark.ExecutableUtils; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.JobTypeEnum; import org.apache.kylin.metadata.cube.model.NDataLayout; @@ -74,7 +75,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger { NDataLayout[] nDataLayouts = merge(dataFlowId, segmentIds, layoutIds, buildResourceStore, abstractExecutable.getJobType(), partitionIds); NDataflow dataflow = NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(dataFlowId); - if (ExecutableUtils.needBuildSnapshots(abstractExecutable)) { + if (SparkJobFactoryUtils.needBuildSnapshots(abstractExecutable)) { mergeSnapshotMeta(dataflow, buildResourceStore); } mergeTableExtMeta(dataflow, buildResourceStore); @@ -110,6 +111,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger { dfUpdate.setToUpdateSegs(theSeg); dfUpdate.setToRemoveSegs(toRemoveSegments.toArray(new NDataSegment[toRemoveSegments.size()])); dfUpdate.setToAddOrUpdateLayouts(theSeg.getSegDetails().getLayouts().toArray(new NDataLayout[0])); + localDataflowManager.updateDataflow(dfUpdate); updateIndexPlan(flowName, remoteStore); return dfUpdate.getToAddOrUpdateLayouts(); diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java index 9ac88ccb0b..3935675a82 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java @@ -27,6 +27,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.engine.spark.ExecutableUtils; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.JobTypeEnum; import org.apache.kylin.metadata.cube.model.NDataLayout; @@ -190,7 +191,7 @@ public class AfterMergeOrRefreshResourceMerger extends SparkJobMetadataMerger { NDataLayout[] nDataLayouts = merge(dataFlowId, segmentIds, layoutIds, buildResourceStore, abstractExecutable.getJobType(), partitionIds); NDataflow dataflow = NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(dataFlowId); - if (ExecutableUtils.needBuildSnapshots(abstractExecutable)) { + if (SparkJobFactoryUtils.needBuildSnapshots(abstractExecutable)) { mergeSnapshotMeta(dataflow, buildResourceStore); } mergeTableExtMeta(dataflow, buildResourceStore); diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/utils/SparkJobFactoryUtils.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/utils/SparkJobFactoryUtils.java new file mode 100644 index 0000000000..f83d6eb712 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/utils/SparkJobFactoryUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.utils; + +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.metadata.cube.model.NBatchConstants; + +import io.kyligence.kap.engine.spark.job.NSparkCubingJob; +import io.kyligence.kap.engine.spark.job.NSparkCubingStep; +import io.kyligence.kap.engine.spark.job.NSparkMergingJob; +import io.kyligence.kap.engine.spark.job.NSparkSnapshotJob; + +public class SparkJobFactoryUtils { + + private SparkJobFactoryUtils() {} + + public static void initJobFactory() { + // register jobFactory in static function + new NSparkCubingJob(); + new NSparkMergingJob(); + new NSparkSnapshotJob(); + } + + public static boolean needBuildSnapshots(AbstractExecutable buildTask) { + if (buildTask instanceof NSparkCubingStep) { + String p = buildTask.getParam(NBatchConstants.P_NEED_BUILD_SNAPSHOTS); + return StringUtils.isBlank(p) || Boolean.parseBoolean(p); + } else { + return false; + } + } +} diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java index e1bc429877..e366dc1bc2 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java @@ -35,6 +35,7 @@ import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.common.util.TempMetadataBuilder; import org.apache.kylin.engine.spark.job.NSparkMergingJob; import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.NExecutableManager; @@ -148,7 +149,7 @@ public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase imple overwriteSystemProp("kylin.engine.spark.build-job-progress-reporter", // "org.apache.kylin.engine.spark.job.MockJobProgressReport"); this.createTestMetadata(); - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); Random r = new Random(10000); zkTestServer = new TestingServer(r.nextInt(), true); overwriteSystemProp("kylin.env.zookeeper-connect-string", zkTestServer.getConnectString()); diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/JobManagerTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/JobManagerTest.java index 805e8ae20f..b1ca18779e 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/JobManagerTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/JobManagerTest.java @@ -37,7 +37,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; -import org.apache.kylin.engine.spark.ExecutableUtils; +import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableParams; @@ -88,7 +88,7 @@ public class JobManagerTest extends NLocalFileMetadataTestCase { public void setup() throws Exception { this.createTestMetadata(); jobManager = JobManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT); - ExecutableUtils.initJobFactory(); + SparkJobFactoryUtils.initJobFactory(); } private void assertExeption(Functions f, String msg) {