KYLIN-978 small update based on yeruiâs patch Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/985e1fb2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/985e1fb2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/985e1fb2
Branch: refs/heads/KYLIN-1011 Commit: 985e1fb2047d296ac89b6cf1230915bbb13dfe99 Parents: 738422e Author: shaofengshi <[email protected]> Authored: Tue Sep 22 11:40:26 2015 +0800 Committer: shaofengshi <[email protected]> Committed: Tue Sep 22 11:40:26 2015 +0800 ---------------------------------------------------------------------- .../kylin/storage/hbase/steps/HBaseMRSteps.java | 22 +++++------------ .../steps/HDFSPathGarbageCollectionStep.java | 26 +++++++++----------- 2 files changed, 17 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/985e1fb2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index f9e9b15..4901512 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -153,17 +153,12 @@ public class HBaseMRSteps extends JobBuilderSupport { jobFlow.addTask(createMergeGCStep()); - List<String> toDeletePathsOnHadoopCluster = new ArrayList<>(); - toDeletePathsOnHadoopCluster.addAll(getMergingHDFSPaths()); - - List<String> toDeletePathsOnHbaseCluster = new ArrayList<>(); - toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId)); - toDeletePathsOnHbaseCluster.add(getHFilePath(jobId)); + List<String> toDeletePaths = new ArrayList<>(); + toDeletePaths.addAll(getMergingHDFSPaths()); HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); - step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster); - step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster); + step.setDeletePaths(toDeletePaths); step.setJobId(jobId); jobFlow.addTask(step); @@ -172,17 +167,12 @@ public class HBaseMRSteps extends JobBuilderSupport { public void addCubingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) { String jobId = jobFlow.getId(); - List<String> toDeletePathsOnHadoopCluster = new ArrayList<>(); - toDeletePathsOnHadoopCluster.add(getFactDistinctColumnsPath(jobId)); - - List<String> toDeletePathsOnHbaseCluster = new ArrayList<>(); - toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId)); - toDeletePathsOnHbaseCluster.add(getHFilePath(jobId)); + List<String> toDeletePaths = new ArrayList<>(); + toDeletePaths.add(getFactDistinctColumnsPath(jobId)); HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); - step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster); - step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster); + step.setDeletePaths(toDeletePaths); step.setJobId(jobId); jobFlow.addTask(step); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/985e1fb2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java index 2ae8ca8..f9f0b80 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java @@ -19,7 +19,6 @@ package org.apache.kylin.storage.hbase.steps; import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.engine.mr.HadoopUtil; @@ -40,6 +39,7 @@ import java.util.List; */ public class HDFSPathGarbageCollectionStep extends AbstractExecutable { + public static final String TO_DELETE_PATHS = "toDeletePaths"; private StringBuffer output; private JobEngineConfig config; @@ -52,8 +52,12 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { try { config = new JobEngineConfig(context.getConfig()); - dropHdfsPathOnCluster(getDeletePathsOnHadoopCluster(), FileSystem.get(HadoopUtil.getCurrentConfiguration())); - dropHdfsPathOnCluster(getDeletePathsOnHBaseCluster(), FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration())); + List<String> toDeletePaths = getDeletePaths(); + dropHdfsPathOnCluster(toDeletePaths, FileSystem.get(HadoopUtil.getCurrentConfiguration())); + + if (StringUtils.isNotEmpty(context.getConfig().getHBaseClusterFs())) { + dropHdfsPathOnCluster(toDeletePaths, FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration())); + } } catch (IOException e) { logger.error("job:" + getId() + " execute finished with exception", e); output.append("\n").append(e.getLocalizedMessage()); @@ -94,24 +98,16 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable { } } - public void setDeletePathsOnHadoopCluster(List<String> deletePaths) { - setArrayParam("toDeletePathsOnHadoopCluster", deletePaths); - } - - public void setDeletePathsOnHBaseCluster(List<String> deletePaths) { - setArrayParam("toDeletePathsOnHBaseCluster", deletePaths); + public void setDeletePaths(List<String> deletePaths) { + setArrayParam(TO_DELETE_PATHS, deletePaths); } public void setJobId(String jobId) { setParam("jobId", jobId); } - public List<String> getDeletePathsOnHadoopCluster() { - return getArrayParam("toDeletePathsOnHadoopCluster"); - } - - public List<String> getDeletePathsOnHBaseCluster() { - return getArrayParam("toDeletePathsOnHBaseCluster"); + public List<String> getDeletePaths() { + return getArrayParam(TO_DELETE_PATHS); } public String getJobId() {
