This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit cfdcb9f1efecd39ad709db7257cac6bc90a98bbd Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Tue Aug 25 23:25:39 2020 +0800 KYLIN-4699 Delete job_tmp path after build/merge successfully --- .../kylin/job/execution/AbstractExecutable.java | 2 + .../engine/spark/metadata/cube/PathManager.java | 44 ++++++++++++++++++++++ .../kylin/engine/spark/job/NSparkCubingStep.java | 9 +++++ .../NSparkUpdateMetaAndCleanupAfterMergeStep.java | 15 +++++--- .../engine/spark/LocalWithSparkSessionTest.java | 22 ++++++++++- .../org/apache/kylin/rest/service/CubeService.java | 11 ++---- 6 files changed, 88 insertions(+), 15 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 908e154..33d42f3 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -198,6 +198,8 @@ public abstract class AbstractExecutable implements Executable, Idempotent { } catch (Throwable e) { logger.error("error running Executable: {}", this.toString()); catchedException = e; + } finally { + cleanup(); } retry++; realException = catchedException != null ? catchedException diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java index 5353523..0484bfc 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java @@ -19,10 +19,15 @@ package org.apache.kylin.engine.spark.metadata.cube; import java.io.File; +import java.io.IOException; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,4 +48,43 @@ public final class PathManager { String hdfsWorkDir = cube.getConfig().getHdfsWorkingDirectory(cube.getProject()); return hdfsWorkDir + "parquet" + File.separator + cube.getName() + File.separator + segName + "_" + identifier; } + + /** + * Delete segment path + */ + public static boolean deleteSegmentParquetStoragePath(CubeInstance cube, CubeSegment segment) throws IOException { + if (cube == null || segment == null) { + return false; + } + String path = getSegmentParquetStoragePath(cube, segment.getName(), + segment.getStorageLocationIdentifier()); + logger.info("Deleting segment parquet path {}", path); + HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(path)); + return true; + } + + /** + * Delete job temp path + */ + public static boolean deleteJobTempPath(KylinConfig kylinConfig, String project, String jobId) { + if (StringUtils.isEmpty(jobId) || StringUtils.isEmpty(project)) { + return false; + } + Path jobTmpPath = new Path(kylinConfig.getJobTmpDir(project)); + try { + Path[] toDeletedPath = + HadoopUtil.getFilteredPath(jobTmpPath.getFileSystem(HadoopUtil.getCurrentConfiguration()), + jobTmpPath, jobId); + if (toDeletedPath != null && toDeletedPath.length > 0) { + for (Path deletedPath : toDeletedPath) { + logger.info("Deleting job tmp path {}", deletedPath.toString()); + HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), deletedPath); + } + } + } catch (IOException e) { + logger.error("Can not delete job tmp path: {}", jobTmpPath); + return false; + } + return true; + } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java index bdc68c6..a235290 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java @@ -24,11 +24,13 @@ import com.google.common.collect.Sets; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.engine.spark.utils.MetaDumpUtil; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.realization.RealizationStatusEnum; @@ -119,4 +121,11 @@ public class NSparkCubingStep extends NSparkExecutable { joblogInfo.put(CubingJob.CUBE_SIZE_BYTES, String.valueOf(cube.getSizeKB())); return joblogInfo; } + + @Override + public void cleanup() throws ExecuteException { + // delete job tmp dir + PathManager.deleteJobTempPath(getConfig(), getParam(MetadataConstants.P_PROJECT_NAME), + getParam(MetadataConstants.P_JOB_ID)); + } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java index e0a6704..b3d6a0c 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java @@ -20,11 +20,8 @@ package org.apache.kylin.engine.spark.job; import java.io.IOException; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -54,11 +51,10 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep extends NSparkExecutable { CubeSegment mergedSegment = cube.getSegmentById(mergedSegmentUuid); Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergedSegment); + // delete segments which were merged for (CubeSegment segment : mergingSegments) { - String path = PathManager.getSegmentParquetStoragePath(cube, segment.getName(), - segment.getStorageLocationIdentifier()); try { - HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(path)); + PathManager.deleteSegmentParquetStoragePath(cube, segment); } catch (IOException e) { throw new ExecuteException("Can not delete segment: " + segment.getName() + ", in cube: " + cube.getName()); } @@ -76,4 +72,11 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep extends NSparkExecutable { AfterMergeOrRefreshResourceMerger merger = new AfterMergeOrRefreshResourceMerger(buildConfig); merger.merge(cubeId, mergedSegmentId, resourceStore, getParam(MetadataConstants.P_JOB_TYPE)); } + + @Override + public void cleanup() throws ExecuteException { + // delete job tmp dir + PathManager.deleteJobTempPath(getConfig(), getParam(MetadataConstants.P_PROJECT_NAME), + getParam(MetadataConstants.P_JOB_ID)); + } } diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java index 2480a6e..53d84e2 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java @@ -31,6 +31,7 @@ import org.apache.kylin.common.util.TempMetadataBuilder; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.spark.builder.CreateFlatTable; import org.apache.kylin.engine.spark.job.NSparkCubingJob; import org.apache.kylin.engine.spark.job.NSparkCubingStep; @@ -45,6 +46,7 @@ import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.lock.MockJobLock; +import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.DataModelManager; @@ -184,7 +186,11 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme // launch the job execMgr.addJob(job); - return wait(job); + ExecutableState result = wait(job); + + checkJobTmpPathDeleted(config, job); + + return result; } protected ExecutableState mergeSegments(String cubeName, long start, long end, boolean force) throws Exception { @@ -202,6 +208,7 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme segment.getStorageLocationIdentifier()); Assert.assertFalse(HadoopUtil.getFileSystem(path).exists(new Path(HadoopUtil.makeURI(path)))); } + checkJobTmpPathDeleted(config, mergeJob); return result; } @@ -306,4 +313,17 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme public String getProject() { return "default"; } + + protected void checkJobTmpPathDeleted(KylinConfig config, CubingJob job) { + String project = job.getParam(MetadataConstants.P_PROJECT_NAME); + String jobId = job.getParam(MetadataConstants.P_JOB_ID); + Path jobTmpPath = new Path(config.getJobTmpDir(project)); + try { + Path[] jobTmpPathArray = + HadoopUtil.getFilteredPath(jobTmpPath.getFileSystem(HadoopUtil.getCurrentConfiguration()), + jobTmpPath, jobId); + Assert.assertTrue(jobTmpPathArray.length == 0); + } catch (IOException e) { + } + } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 2aac66e..b50bdc0 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -29,12 +29,10 @@ import java.util.Map; import java.util.Set; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.lock.DistributedLock; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.CliCommandExecutor; -import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -630,18 +628,15 @@ public class CubeService extends BasicService implements InitializingBean { return cubeInstance; } - // clean segment data in hbase and hdfs + // clean segment data in hdfs private void cleanSegmentStorage(CubeInstance cube, List<CubeSegment> toRemoveSegs) throws IOException { if (!KylinConfig.getInstanceFromEnv().cleanStorageAfterDelOperation()) { return; } if (toRemoveSegs != null && !toRemoveSegs.isEmpty()) { - for (CubeSegment seg : toRemoveSegs) { - String path = PathManager.getSegmentParquetStoragePath(cube, seg.getName(), - seg.getStorageLocationIdentifier()); - logger.info("Deleting segment HDFS path {}", path); - HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(path)); + for (CubeSegment segment : toRemoveSegs) { + PathManager.deleteSegmentParquetStoragePath(cube, segment); } } }