This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new db184be  Kylin-4037 Fix can't Cleanup Data in Hbase's HDFS Storage 
When Deploy Apache Kylin with Standalone HBase Cluster (#687)
db184be is described below

commit db184be59f99b8dae7fab5f1aa857ed00d937135
Author: wangxiaojing123 <49143236+wangxiaojing...@users.noreply.github.com>
AuthorDate: Tue Jul 16 14:52:31 2019 +0800

    Kylin-4037 Fix can't Cleanup Data in Hbase's HDFS Storage When Deploy 
Apache Kylin with Standalone HBase Cluster (#687)
    
    * Kylin-4037 Can't Cleanup Data in Hbase's HDFS Storage When Deploy Apache 
Kylin with Standalone HBase Cluster
    
    * Minor, code format
---
 .../apache/kylin/rest/job/StorageCleanupJob.java   | 108 ++++++++++++---------
 .../kylin/rest/job/StorageCleanupJobTest.java      |   8 +-
 .../kylin/storage/hbase/HBaseConnection.java       |   9 ++
 3 files changed, 74 insertions(+), 51 deletions(-)

diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java 
b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index 3b2e787..c8e73de 100755
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.CliCommandExecutor;
@@ -57,6 +56,7 @@ import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,27 +74,27 @@ public class StorageCleanupJob extends AbstractApplication {
             .withDescription("Warning: will delete all kylin intermediate hive 
tables").create("force");
 
     protected static final Logger logger = 
LoggerFactory.getLogger(StorageCleanupJob.class);
-    
+
     // 
============================================================================
-    
+
     final protected KylinConfig config;
     final protected FileSystem hbaseFs;
     final protected FileSystem defaultFs;
     final protected ExecutableManager executableManager;
-    
+
     protected boolean delete = false;
     protected boolean force = false;
-    
+
     private List<String> hiveGarbageTables = Collections.emptyList();
     private List<String> hbaseGarbageTables = Collections.emptyList();
     private List<String> hdfsGarbageFiles = Collections.emptyList();
     private long hdfsGarbageFileBytes = 0;
 
     public StorageCleanupJob() throws IOException {
-        this(KylinConfig.getInstanceFromEnv(), 
HadoopUtil.getWorkingFileSystem(),
-                HadoopUtil.getWorkingFileSystem(HBaseConfiguration.create()));
+        this(KylinConfig.getInstanceFromEnv(), 
HadoopUtil.getWorkingFileSystem(), HBaseConnection
+                
.getFileSystemInHBaseCluster(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()));
     }
-    
+
     protected StorageCleanupJob(KylinConfig config, FileSystem defaultFs, 
FileSystem hbaseFs) {
         this.config = config;
         this.defaultFs = defaultFs;
@@ -105,11 +105,11 @@ public class StorageCleanupJob extends 
AbstractApplication {
     public void setDelete(boolean delete) {
         this.delete = delete;
     }
-    
+
     public void setForce(boolean force) {
         this.force = force;
     }
-    
+
     public List<String> getHiveGarbageTables() {
         return hiveGarbageTables;
     }
@@ -141,13 +141,13 @@ public class StorageCleanupJob extends 
AbstractApplication {
         logger.info("force option value: '" + 
optionsHelper.getOptionValue(OPTION_FORCE) + "'");
         delete = 
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE));
         force = 
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_FORCE));
-        
+
         cleanup();
     }
-    
+
     // function entrance
     public void cleanup() throws Exception {
-        
+
         cleanUnusedIntermediateHiveTable();
         cleanUnusedHBaseTables();
         cleanUnusedHdfsFiles();
@@ -169,27 +169,19 @@ public class StorageCleanupJob extends 
AbstractApplication {
         }
     }
 
-    protected class UnusedHdfsFileCollector {
-        LinkedHashSet<Pair<FileSystem, String>> list = new LinkedHashSet<>();
-        
-        public void add(FileSystem fs, String path) {
-            list.add(Pair.newPair(fs, path));
-        }
-    }
-    
     private void cleanUnusedHdfsFiles() throws IOException {
-        
+
         UnusedHdfsFileCollector collector = new UnusedHdfsFileCollector();
         collectUnusedHdfsFiles(collector);
-        
+
         if (collector.list.isEmpty()) {
             logger.info("No HDFS files to clean up");
             return;
         }
-        
+
         long garbageBytes = 0;
         List<String> garbageList = new ArrayList<>();
-        
+
         for (Pair<FileSystem, String> entry : collector.list) {
             FileSystem fs = entry.getKey();
             String path = entry.getValue();
@@ -199,7 +191,7 @@ public class StorageCleanupJob extends AbstractApplication {
                 ContentSummary sum = fs.getContentSummary(new Path(path));
                 if (sum != null)
                     garbageBytes += sum.getLength();
-                
+
                 if (delete) {
                     logger.info("Deleting HDFS path " + path);
                     fs.delete(new Path(path), true);
@@ -210,32 +202,33 @@ public class StorageCleanupJob extends 
AbstractApplication {
                 logger.error("Error dealing unused HDFS path " + path, e);
             }
         }
-        
+
         hdfsGarbageFileBytes = garbageBytes;
         hdfsGarbageFiles = garbageList;
     }
-    
+
     protected void collectUnusedHdfsFiles(UnusedHdfsFileCollector collector) 
throws IOException {
         if (StringUtils.isNotEmpty(config.getHBaseClusterFs())) {
-            cleanUnusedHdfsFiles(hbaseFs, collector);
+            cleanUnusedHdfsFiles(hbaseFs, collector, true);
         }
-        cleanUnusedHdfsFiles(defaultFs, collector);
+        cleanUnusedHdfsFiles(defaultFs, collector, false);
     }
-    
-    private void cleanUnusedHdfsFiles(FileSystem fs, UnusedHdfsFileCollector 
collector) throws IOException {
+
+    private void cleanUnusedHdfsFiles(FileSystem fs, UnusedHdfsFileCollector 
collector, boolean hbaseFs)
+            throws IOException {
         final JobEngineConfig engineConfig = new JobEngineConfig(config);
         final CubeManager cubeMgr = CubeManager.getInstance(config);
-        
+
         List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
-        
+
         try {
-            FileStatus[] fStatus = fs.listStatus(new 
Path(config.getHdfsWorkingDirectory()));
+            FileStatus[] fStatus = fs
+                    .listStatus(Path.getPathWithoutSchemeAndAuthority(new 
Path(config.getHdfsWorkingDirectory())));
             if (fStatus != null) {
                 for (FileStatus status : fStatus) {
                     String path = status.getPath().getName();
                     if (path.startsWith("kylin-")) {
-                        String kylinJobPath = 
engineConfig.getHdfsWorkingDirectory() + path;
-                        allHdfsPathsNeedToBeDeleted.add(kylinJobPath);
+                        
allHdfsPathsNeedToBeDeleted.add(status.getPath().toString());
                     }
                 }
             }
@@ -249,6 +242,13 @@ public class StorageCleanupJob extends AbstractApplication 
{
             final ExecutableState state = 
executableManager.getOutput(jobId).getState();
             if (!state.isFinalState()) {
                 String path = 
JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), 
jobId);
+
+                if (hbaseFs) {
+                    path = 
HBaseConnection.makeQualifiedPathInHBaseCluster(path);
+                } else {//Compatible with local fs, unit tests, mockito
+                    Path p = Path.getPathWithoutSchemeAndAuthority(new 
Path(path));
+                    path = 
HadoopUtil.getFileSystem(path).makeQualified(p).toString();
+                }
                 allHdfsPathsNeedToBeDeleted.remove(path);
                 logger.info("Skip " + path + " from deletion list, as the path 
belongs to job " + jobId
                         + " with status " + state);
@@ -261,13 +261,19 @@ public class StorageCleanupJob extends 
AbstractApplication {
                 String jobUuid = seg.getLastBuildJobID();
                 if (jobUuid != null && jobUuid.equals("") == false) {
                     String path = 
JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), 
jobUuid);
+                    if (hbaseFs) {
+                        path = 
HBaseConnection.makeQualifiedPathInHBaseCluster(path);
+                    } else {//Compatible with local fs, unit tests, mockito
+                        Path p = Path.getPathWithoutSchemeAndAuthority(new 
Path(path));
+                        path = 
HadoopUtil.getFileSystem(path).makeQualified(p).toString();
+                    }
                     allHdfsPathsNeedToBeDeleted.remove(path);
                     logger.info("Skip " + path + " from deletion list, as the 
path belongs to segment " + seg
                             + " of cube " + cube.getName());
                 }
             }
         }
-        
+
         // collect the garbage
         for (String path : allHdfsPathsNeedToBeDeleted)
             collector.add(fs, path);
@@ -283,12 +289,12 @@ public class StorageCleanupJob extends 
AbstractApplication {
                 throw e;
         }
     }
-    
+
     private void cleanUnusedIntermediateHiveTableInternal() throws Exception {
         final int uuidLength = 36;
         final String prefix = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX;
         final String uuidPattern = 
"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
-        
+
         List<String> hiveTableNames = getHiveTables();
         Iterable<String> kylinIntermediates = Iterables.filter(hiveTableNames, 
new Predicate<String>() {
             @Override
@@ -310,7 +316,7 @@ public class StorageCleanupJob extends AbstractApplication {
 
             try {
                 String segmentId = getSegmentIdFromJobId(jobId);
-                if (segmentId != null) {//some jobs are not cubing jobs 
+                if (segmentId != null) {//some jobs are not cubing jobs
                     segmentId2JobId.put(segmentId, jobId);
                 }
             } catch (Exception ex) {
@@ -369,7 +375,7 @@ public class StorageCleanupJob extends AbstractApplication {
             logger.info("No Hive tables to clean up");
             return;
         }
-        
+
         if (delete) {
             try {
                 deleteHiveTables(allHiveTablesNeedToBeDeleted, 
segmentId2JobId);
@@ -382,13 +388,13 @@ public class StorageCleanupJob extends 
AbstractApplication {
             }
         }
     }
-    
+
     // override by test
     protected List<String> getHiveTables() throws Exception {
         ISourceMetadataExplorer explr = 
SourceManager.getDefaultSource().getSourceMetadataExplorer();
         return explr.listTables(config.getHiveDatabaseForIntermediateTable());
     }
-    
+
     // override by test
     protected CliCommandExecutor getCliCommandExecutor() throws IOException {
         return config.getCliCommandExecutor();
@@ -398,7 +404,7 @@ public class StorageCleanupJob extends AbstractApplication {
             throws IOException {
         final JobEngineConfig engineConfig = new JobEngineConfig(config);
         final int uuidLength = 36;
-        
+
         final String useDatabaseHql = "USE " + 
config.getHiveDatabaseForIntermediateTable() + ";";
         final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
         hiveCmdBuilder.addStatement(useDatabaseHql);
@@ -407,8 +413,8 @@ public class StorageCleanupJob extends AbstractApplication {
             logger.info("Deleting Hive table " + delHive);
         }
         getCliCommandExecutor().execute(hiveCmdBuilder.build());
-        
-        // If kylin.source.hive.keep-flat-table, some intermediate table might 
be kept. 
+
+        // If kylin.source.hive.keep-flat-table, some intermediate table might 
be kept.
         // Do delete external path.
         for (String tableToDelete : allHiveTablesNeedToBeDeleted) {
             String uuid = tableToDelete.substring(tableToDelete.length() - 
uuidLength, tableToDelete.length());
@@ -449,4 +455,12 @@ public class StorageCleanupJob extends AbstractApplication 
{
         return false;
     }
 
+    protected class UnusedHdfsFileCollector {
+        LinkedHashSet<Pair<FileSystem, String>> list = new LinkedHashSet<>();
+
+        public void add(FileSystem fs, String path) {
+            list.add(Pair.newPair(fs, path));
+        }
+    }
+
 }
diff --git 
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
 
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
index e494884..3e4d9cb 100644
--- 
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
+++ 
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
@@ -84,15 +84,15 @@ public class StorageCleanupJobTest {
         FileStatus f2 = mock(FileStatus.class);
         FileStatus f3 = mock(FileStatus.class);
         // only remove FINISHED and DISCARDED job intermediate files, so this 
exclude.
-        when(f1.getPath()).thenReturn(new 
Path("kylin-091a0322-249c-43e7-91df-205603ab6883"));
+        when(f1.getPath()).thenReturn(new 
Path("file:///tmp/examples/test_metadata/kylin-091a0322-249c-43e7-91df-205603ab6883"));
         // remove every segment working dir from deletion list, so this 
exclude.
-        when(f2.getPath()).thenReturn(new 
Path("kylin-bcf2f125-9b0b-40dd-9509-95ec59b31333"));
-        when(f3.getPath()).thenReturn(new Path("kylin-to-be-delete"));
+        when(f2.getPath()).thenReturn(new 
Path("file:///tmp/examples/test_metadata/kylin-bcf2f125-9b0b-40dd-9509-95ec59b31333"));
+        when(f3.getPath()).thenReturn(new 
Path("file:///tmp/examples/test_metadata/kylin-to-be-delete"));
         statuses[0] = f1;
         statuses[1] = f2;
         statuses[2] = f3;
 
-        when(mockFs.listStatus(p1)).thenReturn(statuses);
+        
when(mockFs.listStatus(Path.getPathWithoutSchemeAndAuthority(p1))).thenReturn(statuses);
         Path p2 = new 
Path("file:///tmp/examples/test_metadata/kylin-to-be-delete");
         when(mockFs.exists(p2)).thenReturn(true);
     }
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index 6678418..43c1a36 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -238,6 +238,15 @@ public class HBaseConnection {
         return fs.makeQualified(path).toString();
     }
 
+    public static FileSystem getFileSystemInHBaseCluster(String inPath) {
+        Path path = new Path(inPath);
+        path = Path.getPathWithoutSchemeAndAuthority(path);
+
+        FileSystem fs = HadoopUtil.getFileSystem(path, 
getCurrentHBaseConfiguration()); // Must be HBase's FS, not working FS
+        return fs;
+    }
+
+
     // 
============================================================================
 
     // returned Connection can be shared by multiple threads and does not 
require close()

Reply via email to