This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new db184be Kylin-4037 Fix can't Cleanup Data in Hbase's HDFS Storage When Deploy Apache Kylin with Standalone HBase Cluster (#687) db184be is described below commit db184be59f99b8dae7fab5f1aa857ed00d937135 Author: wangxiaojing123 <49143236+wangxiaojing...@users.noreply.github.com> AuthorDate: Tue Jul 16 14:52:31 2019 +0800 Kylin-4037 Fix can't Cleanup Data in Hbase's HDFS Storage When Deploy Apache Kylin with Standalone HBase Cluster (#687) * Kylin-4037 Can't Cleanup Data in Hbase's HDFS Storage When Deploy Apache Kylin with Standalone HBase Cluster * Minor, code format --- .../apache/kylin/rest/job/StorageCleanupJob.java | 108 ++++++++++++--------- .../kylin/rest/job/StorageCleanupJobTest.java | 8 +- .../kylin/storage/hbase/HBaseConnection.java | 9 ++ 3 files changed, 74 insertions(+), 51 deletions(-) diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java index 3b2e787..c8e73de 100755 --- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java @@ -38,7 +38,6 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.CliCommandExecutor; @@ -57,6 +56,7 @@ import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.SourceManager; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,27 +74,27 @@ public class StorageCleanupJob extends AbstractApplication { .withDescription("Warning: will delete all kylin intermediate hive tables").create("force"); protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class); - + // ============================================================================ - + final protected KylinConfig config; final protected FileSystem hbaseFs; final protected FileSystem defaultFs; final protected ExecutableManager executableManager; - + protected boolean delete = false; protected boolean force = false; - + private List<String> hiveGarbageTables = Collections.emptyList(); private List<String> hbaseGarbageTables = Collections.emptyList(); private List<String> hdfsGarbageFiles = Collections.emptyList(); private long hdfsGarbageFileBytes = 0; public StorageCleanupJob() throws IOException { - this(KylinConfig.getInstanceFromEnv(), HadoopUtil.getWorkingFileSystem(), - HadoopUtil.getWorkingFileSystem(HBaseConfiguration.create())); + this(KylinConfig.getInstanceFromEnv(), HadoopUtil.getWorkingFileSystem(), HBaseConnection + .getFileSystemInHBaseCluster(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())); } - + protected StorageCleanupJob(KylinConfig config, FileSystem defaultFs, FileSystem hbaseFs) { this.config = config; this.defaultFs = defaultFs; @@ -105,11 +105,11 @@ public class StorageCleanupJob extends AbstractApplication { public void setDelete(boolean delete) { this.delete = delete; } - + public void setForce(boolean force) { this.force = force; } - + public List<String> getHiveGarbageTables() { return hiveGarbageTables; } @@ -141,13 +141,13 @@ public class StorageCleanupJob extends AbstractApplication { logger.info("force option value: '" + optionsHelper.getOptionValue(OPTION_FORCE) + "'"); delete = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE)); force = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_FORCE)); - + cleanup(); } - + // function entrance public void cleanup() throws Exception { - + cleanUnusedIntermediateHiveTable(); cleanUnusedHBaseTables(); cleanUnusedHdfsFiles(); @@ -169,27 +169,19 @@ public class StorageCleanupJob extends AbstractApplication { } } - protected class UnusedHdfsFileCollector { - LinkedHashSet<Pair<FileSystem, String>> list = new LinkedHashSet<>(); - - public void add(FileSystem fs, String path) { - list.add(Pair.newPair(fs, path)); - } - } - private void cleanUnusedHdfsFiles() throws IOException { - + UnusedHdfsFileCollector collector = new UnusedHdfsFileCollector(); collectUnusedHdfsFiles(collector); - + if (collector.list.isEmpty()) { logger.info("No HDFS files to clean up"); return; } - + long garbageBytes = 0; List<String> garbageList = new ArrayList<>(); - + for (Pair<FileSystem, String> entry : collector.list) { FileSystem fs = entry.getKey(); String path = entry.getValue(); @@ -199,7 +191,7 @@ public class StorageCleanupJob extends AbstractApplication { ContentSummary sum = fs.getContentSummary(new Path(path)); if (sum != null) garbageBytes += sum.getLength(); - + if (delete) { logger.info("Deleting HDFS path " + path); fs.delete(new Path(path), true); @@ -210,32 +202,33 @@ public class StorageCleanupJob extends AbstractApplication { logger.error("Error dealing unused HDFS path " + path, e); } } - + hdfsGarbageFileBytes = garbageBytes; hdfsGarbageFiles = garbageList; } - + protected void collectUnusedHdfsFiles(UnusedHdfsFileCollector collector) throws IOException { if (StringUtils.isNotEmpty(config.getHBaseClusterFs())) { - cleanUnusedHdfsFiles(hbaseFs, collector); + cleanUnusedHdfsFiles(hbaseFs, collector, true); } - cleanUnusedHdfsFiles(defaultFs, collector); + cleanUnusedHdfsFiles(defaultFs, collector, false); } - - private void cleanUnusedHdfsFiles(FileSystem fs, UnusedHdfsFileCollector collector) throws IOException { + + private void cleanUnusedHdfsFiles(FileSystem fs, UnusedHdfsFileCollector collector, boolean hbaseFs) + throws IOException { final JobEngineConfig engineConfig = new JobEngineConfig(config); final CubeManager cubeMgr = CubeManager.getInstance(config); - + List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>(); - + try { - FileStatus[] fStatus = fs.listStatus(new Path(config.getHdfsWorkingDirectory())); + FileStatus[] fStatus = fs + .listStatus(Path.getPathWithoutSchemeAndAuthority(new Path(config.getHdfsWorkingDirectory()))); if (fStatus != null) { for (FileStatus status : fStatus) { String path = status.getPath().getName(); if (path.startsWith("kylin-")) { - String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + path; - allHdfsPathsNeedToBeDeleted.add(kylinJobPath); + allHdfsPathsNeedToBeDeleted.add(status.getPath().toString()); } } } @@ -249,6 +242,13 @@ public class StorageCleanupJob extends AbstractApplication { final ExecutableState state = executableManager.getOutput(jobId).getState(); if (!state.isFinalState()) { String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), jobId); + + if (hbaseFs) { + path = HBaseConnection.makeQualifiedPathInHBaseCluster(path); + } else {//Compatible with local fs, unit tests, mockito + Path p = Path.getPathWithoutSchemeAndAuthority(new Path(path)); + path = HadoopUtil.getFileSystem(path).makeQualified(p).toString(); + } allHdfsPathsNeedToBeDeleted.remove(path); logger.info("Skip " + path + " from deletion list, as the path belongs to job " + jobId + " with status " + state); @@ -261,13 +261,19 @@ public class StorageCleanupJob extends AbstractApplication { String jobUuid = seg.getLastBuildJobID(); if (jobUuid != null && jobUuid.equals("") == false) { String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), jobUuid); + if (hbaseFs) { + path = HBaseConnection.makeQualifiedPathInHBaseCluster(path); + } else {//Compatible with local fs, unit tests, mockito + Path p = Path.getPathWithoutSchemeAndAuthority(new Path(path)); + path = HadoopUtil.getFileSystem(path).makeQualified(p).toString(); + } allHdfsPathsNeedToBeDeleted.remove(path); logger.info("Skip " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName()); } } } - + // collect the garbage for (String path : allHdfsPathsNeedToBeDeleted) collector.add(fs, path); @@ -283,12 +289,12 @@ public class StorageCleanupJob extends AbstractApplication { throw e; } } - + private void cleanUnusedIntermediateHiveTableInternal() throws Exception { final int uuidLength = 36; final String prefix = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX; final String uuidPattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; - + List<String> hiveTableNames = getHiveTables(); Iterable<String> kylinIntermediates = Iterables.filter(hiveTableNames, new Predicate<String>() { @Override @@ -310,7 +316,7 @@ public class StorageCleanupJob extends AbstractApplication { try { String segmentId = getSegmentIdFromJobId(jobId); - if (segmentId != null) {//some jobs are not cubing jobs + if (segmentId != null) {//some jobs are not cubing jobs segmentId2JobId.put(segmentId, jobId); } } catch (Exception ex) { @@ -369,7 +375,7 @@ public class StorageCleanupJob extends AbstractApplication { logger.info("No Hive tables to clean up"); return; } - + if (delete) { try { deleteHiveTables(allHiveTablesNeedToBeDeleted, segmentId2JobId); @@ -382,13 +388,13 @@ public class StorageCleanupJob extends AbstractApplication { } } } - + // override by test protected List<String> getHiveTables() throws Exception { ISourceMetadataExplorer explr = SourceManager.getDefaultSource().getSourceMetadataExplorer(); return explr.listTables(config.getHiveDatabaseForIntermediateTable()); } - + // override by test protected CliCommandExecutor getCliCommandExecutor() throws IOException { return config.getCliCommandExecutor(); @@ -398,7 +404,7 @@ public class StorageCleanupJob extends AbstractApplication { throws IOException { final JobEngineConfig engineConfig = new JobEngineConfig(config); final int uuidLength = 36; - + final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";"; final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); hiveCmdBuilder.addStatement(useDatabaseHql); @@ -407,8 +413,8 @@ public class StorageCleanupJob extends AbstractApplication { logger.info("Deleting Hive table " + delHive); } getCliCommandExecutor().execute(hiveCmdBuilder.build()); - - // If kylin.source.hive.keep-flat-table, some intermediate table might be kept. + + // If kylin.source.hive.keep-flat-table, some intermediate table might be kept. // Do delete external path. for (String tableToDelete : allHiveTablesNeedToBeDeleted) { String uuid = tableToDelete.substring(tableToDelete.length() - uuidLength, tableToDelete.length()); @@ -449,4 +455,12 @@ public class StorageCleanupJob extends AbstractApplication { return false; } + protected class UnusedHdfsFileCollector { + LinkedHashSet<Pair<FileSystem, String>> list = new LinkedHashSet<>(); + + public void add(FileSystem fs, String path) { + list.add(Pair.newPair(fs, path)); + } + } + } diff --git a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java index e494884..3e4d9cb 100644 --- a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java +++ b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java @@ -84,15 +84,15 @@ public class StorageCleanupJobTest { FileStatus f2 = mock(FileStatus.class); FileStatus f3 = mock(FileStatus.class); // only remove FINISHED and DISCARDED job intermediate files, so this exclude. - when(f1.getPath()).thenReturn(new Path("kylin-091a0322-249c-43e7-91df-205603ab6883")); + when(f1.getPath()).thenReturn(new Path("file:///tmp/examples/test_metadata/kylin-091a0322-249c-43e7-91df-205603ab6883")); // remove every segment working dir from deletion list, so this exclude. - when(f2.getPath()).thenReturn(new Path("kylin-bcf2f125-9b0b-40dd-9509-95ec59b31333")); - when(f3.getPath()).thenReturn(new Path("kylin-to-be-delete")); + when(f2.getPath()).thenReturn(new Path("file:///tmp/examples/test_metadata/kylin-bcf2f125-9b0b-40dd-9509-95ec59b31333")); + when(f3.getPath()).thenReturn(new Path("file:///tmp/examples/test_metadata/kylin-to-be-delete")); statuses[0] = f1; statuses[1] = f2; statuses[2] = f3; - when(mockFs.listStatus(p1)).thenReturn(statuses); + when(mockFs.listStatus(Path.getPathWithoutSchemeAndAuthority(p1))).thenReturn(statuses); Path p2 = new Path("file:///tmp/examples/test_metadata/kylin-to-be-delete"); when(mockFs.exists(p2)).thenReturn(true); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index 6678418..43c1a36 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -238,6 +238,15 @@ public class HBaseConnection { return fs.makeQualified(path).toString(); } + public static FileSystem getFileSystemInHBaseCluster(String inPath) { + Path path = new Path(inPath); + path = Path.getPathWithoutSchemeAndAuthority(path); + + FileSystem fs = HadoopUtil.getFileSystem(path, getCurrentHBaseConfiguration()); // Must be HBase's FS, not working FS + return fs; + } + + // ============================================================================ // returned Connection can be shared by multiple threads and does not require close()