This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new ea7a8b4 KYLIN-4766 Delete job tmp and segment file after job be discarded ea7a8b4 is described below commit ea7a8b454fd884fd504b17ec8dabcade89e76b0f Author: yaqian.zhang <598593...@qq.com> AuthorDate: Fri Sep 25 14:55:41 2020 +0800 KYLIN-4766 Delete job tmp and segment file after job be discarded --- .../apache/kylin/engine/spark/job/NSparkCubingJob.java | 17 +++++++++++++++++ .../java/org/apache/kylin/rest/service/JobService.java | 9 ++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java index c2e615d..da00f6d 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java @@ -18,6 +18,7 @@ package org.apache.kylin.engine.spark.job; +import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; @@ -32,8 +33,10 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.engine.spark.utils.MetaDumpUtil; import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.spark_project.guava.base.Preconditions; @@ -139,4 +142,18 @@ public class NSparkCubingJob extends CubingJob { public void setCube(CubeInstance cube) { this.cube = cube; } + + public void cleanupAfterJobDiscard() { + try { + PathManager.deleteJobTempPath(getConfig(), getParam(MetadataConstants.P_PROJECT_NAME), + getParam(MetadataConstants.P_JOB_ID)); + + CubeManager cubeManager = CubeManager.getInstance(getConfig()); + CubeInstance cube = cubeManager.getCube(getParam(MetadataConstants.P_CUBE_NAME)); + CubeSegment segment = cube.getSegment(getParam(MetadataConstants.SEGMENT_NAME), SegmentStatusEnum.NEW); + PathManager.deleteSegmentParquetStoragePath(cube, segment); + } catch (IOException e) { + logger.warn("Delete resource file failed after job be discarded, due to", e); + } + } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 1ce6521..f7be7d1 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -48,6 +48,7 @@ import org.apache.kylin.engine.mr.LookupSnapshotBuildJob; import org.apache.kylin.engine.mr.common.CubeJobLockUtil; import org.apache.kylin.engine.mr.common.JobInfoConverter; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.spark.job.NSparkCubingJob; import org.apache.kylin.engine.spark.metadata.cube.source.SourceFactory; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.JobSearchResult; @@ -644,9 +645,15 @@ public class JobService extends BasicService implements InitializingBean { "The job " + job.getId() + " has already been finished and cannot be discarded."); } + AbstractExecutable executable = getExecutableManager().getJob(job.getId()); + if (job.getStatus() != JobStatusEnum.DISCARDED) { - AbstractExecutable executable = getExecutableManager().getJob(job.getId()); if (executable instanceof CubingJob) { + //Clean up job tmp and segment storage from hdfs after job be discarded + if (executable instanceof NSparkCubingJob) { + ((NSparkCubingJob) executable).cleanupAfterJobDiscard(); + } + cancelCubingJobInner((CubingJob) executable); //release global mr hive dict lock if exists if (executable.getStatus().isFinalState()) {