Rename stats intermediate table name and make them droppable (#1267)

* minor, enhance StorageCleanup tool

* #1100, rename stats intermediate table


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bf871691
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bf871691
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bf871691

Branch: refs/heads/master
Commit: bf8716919aaf6bf3880b12fc347aa04d98c6dc83
Parents: 1e38694
Author: 成 <[email protected]>
Authored: Wed Jun 21 12:53:52 2017 +0800
Committer: Billy(Yiming) Liu <[email protected]>
Committed: Wed Jun 21 12:53:52 2017 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/common/BatchConstants.java  |  3 +-
 .../kylin/rest/job/StorageCleanupJob.java       | 63 ++++++++++++--------
 2 files changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/bf871691/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 602b4bb..0cb23ac 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -45,6 +45,8 @@ public interface BatchConstants {
     String CFG_REGION_NUMBER_MAX = "region.number.max";
     String CFG_REGION_SPLIT_SIZE = "region.split.size";
     String CFG_HFILE_SIZE_GB = "hfile.size.gb";
+    String CFG_STATS_JOB_ID = "stats.job.id";
+    String CFG_STATS_JOB_FREQUENCY = "stats.sample.frequency";
 
     String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/";
     String CFG_KYLIN_HDFS_TEMP_DIR = "/tmp/kylin/";
@@ -62,7 +64,6 @@ public interface BatchConstants {
     String CFG_OUTPUT_STATISTICS = "statistics";
     String CFG_OUTPUT_PARTITION = "partition";
 
-
     /**
      * command line ARGuments
      */

http://git-wip-us.apache.org/repos/asf/kylin/blob/bf871691/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java 
b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index 448e3c6..9b72788 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -60,15 +60,18 @@ import com.google.common.collect.Maps;
 public class StorageCleanupJob extends AbstractApplication {
 
     @SuppressWarnings("static-access")
-    protected static final Option OPTION_DELETE = 
OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete
 the unused storage").create("delete");
-    protected static final Option OPTION_FORCE = 
OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning:
 will delete all kylin intermediate hive tables").create("force");
+    protected static final Option OPTION_DELETE = 
OptionBuilder.withArgName("delete").hasArg().isRequired(false)
+            .withDescription("Delete the unused storage").create("delete");
+    protected static final Option OPTION_FORCE = 
OptionBuilder.withArgName("force").hasArg().isRequired(false)
+            .withDescription("Warning: will delete all kylin intermediate hive 
tables").create("force");
 
     protected static final Logger logger = 
LoggerFactory.getLogger(StorageCleanupJob.class);
     public static final int deleteTimeout = 10; // Unit minute
 
     protected boolean delete = false;
     protected boolean force = false;
-    protected static ExecutableManager executableManager = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+    protected static ExecutableManager executableManager = ExecutableManager
+            .getInstance(KylinConfig.getInstanceFromEnv());
 
     protected void cleanUnusedHBaseTables() throws IOException {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
@@ -76,7 +79,8 @@ public class StorageCleanupJob extends AbstractApplication {
             try {
                 // use reflection to isolate NoClassDef errors when HBase is 
not available
                 Class hbaseCleanUpUtil = 
Class.forName("org.apache.kylin.rest.job.StorageCleanJobHbaseUtil");
-                Method cleanUnusedHBaseTables = 
hbaseCleanUpUtil.getDeclaredMethod("cleanUnusedHBaseTables", boolean.class, 
int.class);
+                Method cleanUnusedHBaseTables = 
hbaseCleanUpUtil.getDeclaredMethod("cleanUnusedHBaseTables",
+                        boolean.class, int.class);
                 cleanUnusedHBaseTables.invoke(hbaseCleanUpUtil, delete, 
deleteTimeout);
             } catch (Throwable e) {
                 throw new IOException(e);
@@ -132,7 +136,8 @@ public class StorageCleanupJob extends AbstractApplication {
             if (!state.isFinalState()) {
                 String path = 
JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), 
jobId);
                 allHdfsPathsNeedToBeDeleted.remove(path);
-                logger.info("Skip " + path + " from deletion list, as the path 
belongs to job " + jobId + " with status " + state);
+                logger.info("Skip " + path + " from deletion list, as the path 
belongs to job " + jobId
+                        + " with status " + state);
             }
         }
 
@@ -143,7 +148,8 @@ public class StorageCleanupJob extends AbstractApplication {
                 if (jobUuid != null && jobUuid.equals("") == false) {
                     String path = 
JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), 
jobUuid);
                     allHdfsPathsNeedToBeDeleted.remove(path);
-                    logger.info("Skip " + path + " from deletion list, as the 
path belongs to segment " + seg + " of cube " + cube.getName());
+                    logger.info("Skip " + path + " from deletion list, as the 
path belongs to segment " + seg
+                            + " of cube " + cube.getName());
                 }
             }
         }
@@ -227,22 +233,25 @@ public class StorageCleanupJob extends 
AbstractApplication {
 
             boolean isNeedDel = true;
 
-            if (line.length() > preFix.length() + uuidLength) {
-                String uuid = line.substring(line.length() - uuidLength, 
line.length());
-                uuid = uuid.replace("_", "-");
-                final Pattern UUId_PATTERN = Pattern.compile(uuidPattern);
-                if (UUId_PATTERN.matcher(uuid).matches()) {
-                    //Check whether it's a hive table in use
-                    if (isTableInUse(uuid, workingJobList)) {
-                        logger.info("Skip deleting because the table is in 
use");
-                        isNeedDel = false;
-                    }
-                } else {
-                    logger.info("Skip deleting because not match pattern");
-                    isNeedDel = false;
-                }
-            } else {
-                logger.info("Skip deleting because length not qualified");
+            if (line.length() < preFix.length() + uuidLength) {
+                logger.info("Skip deleting because length is not qualified");
+                continue;
+            }
+
+            String uuid = line.substring(line.length() - uuidLength, 
line.length());
+            uuid = uuid.replace("_", "-");
+            final Pattern UUID_PATTERN = Pattern.compile(uuidPattern);
+
+            if (!UUID_PATTERN.matcher(uuid).matches()) {
+                logger.info("Skip deleting because pattern doesn't match");
+                continue;
+            }
+
+            //Some intermediate table ends with job's uuid
+            if (allJobs.contains(uuid)) {
+                isNeedDel = !workingJobList.contains(uuid);
+            } else if (isTableInUse(uuid, workingJobList)) {
+                logger.info("Skip deleting because the table is in use");
                 isNeedDel = false;
             }
 
@@ -270,17 +279,21 @@ public class StorageCleanupJob extends 
AbstractApplication {
                     String segmentId = uuid.replace("_", "-");
 
                     if (segmentId2JobId.containsKey(segmentId)) {
-                        String path = 
JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), 
segmentId2JobId.get(segmentId)) + "/" + tableToDelete;
+                        String path = 
JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(),
+                                segmentId2JobId.get(segmentId)) + "/" + 
tableToDelete;
                         Path externalDataPath = new Path(path);
                         FileSystem fs = HadoopUtil.getWorkingFileSystem();
                         if (fs.exists(externalDataPath)) {
                             fs.delete(externalDataPath, true);
                             logger.info("Hive table {}'s external path {} 
deleted", tableToDelete, path);
                         } else {
-                            logger.info("Hive table {}'s external path {} not 
exist. It's normal if kylin.source.hive.keep-flat-table set false (By 
default)", tableToDelete, path);
+                            logger.info(
+                                    "Hive table {}'s external path {} not 
exist. It's normal if kylin.source.hive.keep-flat-table set false (By default)",
+                                    tableToDelete, path);
                         }
                     } else {
-                        logger.warn("Hive table {}'s job ID not found, 
segmentId2JobId: {}", tableToDelete, segmentId2JobId.toString());
+                        logger.warn("Hive table {}'s job ID not found, 
segmentId2JobId: {}", tableToDelete,
+                                segmentId2JobId.toString());
                     }
                 }
             } catch (IOException e) {

Reply via email to