This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 2d998f181dbeec0a5efb42e2bf94329897e8e150 Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Wed Aug 26 22:49:25 2020 +0800 KYLIN-4695 Automatically start sparder (for query) application when start kylin instance --- .../kylin/job/execution/AbstractExecutable.java | 4 +- .../org/apache/kylin/job/execution/Idempotent.java | 2 +- .../kylin/engine/spark/job/NSparkCubingStep.java | 9 ++-- .../NSparkUpdateMetaAndCleanupAfterMergeStep.java | 8 +-- .../kylin/rest/init/InitialSparkerContext.java | 59 ++++++++++++++++++++++ server/src/main/resources/applicationContext.xml | 1 + 6 files changed, 74 insertions(+), 9 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 33d42f3..7f6ba2f 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -199,7 +199,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { logger.error("error running Executable: {}", this.toString()); catchedException = e; } finally { - cleanup(); + cleanup(result); } retry++; realException = catchedException != null ? catchedException @@ -251,7 +251,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException, PersistentException; @Override - public void cleanup() throws ExecuteException { + public void cleanup(ExecuteResult result) throws ExecuteException { } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/Idempotent.java b/core-job/src/main/java/org/apache/kylin/job/execution/Idempotent.java index 98c950e..49d73a4 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/Idempotent.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/Idempotent.java @@ -24,5 +24,5 @@ import org.apache.kylin.job.exception.ExecuteException; */ public interface Idempotent { - void cleanup() throws ExecuteException; + void cleanup(ExecuteResult result) throws ExecuteException; } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java index a235290..074ff17 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java @@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.realization.RealizationStatusEnum; @@ -123,9 +124,11 @@ public class NSparkCubingStep extends NSparkExecutable { } @Override - public void cleanup() throws ExecuteException { + public void cleanup(ExecuteResult result) throws ExecuteException { // delete job tmp dir - PathManager.deleteJobTempPath(getConfig(), getParam(MetadataConstants.P_PROJECT_NAME), - getParam(MetadataConstants.P_JOB_ID)); + if (result != null && result.state().ordinal() == ExecuteResult.State.SUCCEED.ordinal()) { + PathManager.deleteJobTempPath(getConfig(), getParam(MetadataConstants.P_PROJECT_NAME), + getParam(MetadataConstants.P_JOB_ID)); + } } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java index b3d6a0c..2ccd810 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java @@ -74,9 +74,11 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep extends NSparkExecutable { } @Override - public void cleanup() throws ExecuteException { + public void cleanup(ExecuteResult result) throws ExecuteException { // delete job tmp dir - PathManager.deleteJobTempPath(getConfig(), getParam(MetadataConstants.P_PROJECT_NAME), - getParam(MetadataConstants.P_JOB_ID)); + if (result != null && result.state().ordinal() == ExecuteResult.State.SUCCEED.ordinal()) { + PathManager.deleteJobTempPath(getConfig(), getParam(MetadataConstants.P_PROJECT_NAME), + getParam(MetadataConstants.P_JOB_ID)); + } } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparkerContext.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparkerContext.java new file mode 100644 index 0000000..34f977a --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparkerContext.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.init; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.spark.sql.SparderContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; + +import java.io.File; +import java.nio.file.Paths; + +/** + * Created by zhangzc on 8/26/20. + */ +public class InitialSparkerContext implements InitializingBean { + + private static final Logger logger = LoggerFactory.getLogger(InitialSparkerContext.class); + + @Override + public void afterPropertiesSet() throws Exception { + runInitialSparder(); + } + + private void runInitialSparder() { + logger.info("Spark is starting....."); + SparderContext.init(); + final String kylinHome = StringUtils.defaultIfBlank(KylinConfig.getKylinHome(), "./"); + final File appidFile = Paths.get(kylinHome, "sparkappid").toFile(); + String appid = null; + try { + appid = SparderContext.getSparkSession().sparkContext().applicationId(); + FileUtils.writeStringToFile(appidFile, appid); + logger.info("Spark application id is {}", appid); + } catch (Exception e) { + logger.error("Failed to generate spark application id[{}] file", + StringUtils.defaultString(appid), e); + } + } +} diff --git a/server/src/main/resources/applicationContext.xml b/server/src/main/resources/applicationContext.xml index b873622..dcf249b 100644 --- a/server/src/main/resources/applicationContext.xml +++ b/server/src/main/resources/applicationContext.xml @@ -38,6 +38,7 @@ <aop:aspectj-autoproxy/> <bean class="org.apache.kylin.rest.init.InitialTaskManager" /> + <bean class="org.apache.kylin.rest.init.InitialSparkerContext" /> <context:component-scan base-package="org.apache.kylin.rest"/>