Repository: kylin
Updated Branches:
  refs/heads/yang21-cdh5.7 97d675de6 -> 3e77cba69 (forced update)


KYLIN-2144 move useful operation tools to org.apache.kylin.tool


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d2fde806
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d2fde806
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d2fde806

Branch: refs/heads/yang21-cdh5.7
Commit: d2fde8067f39150aac2cee30e8c628f2d73f6c61
Parents: de9f3ad
Author: Hongbin Ma <mahong...@apache.org>
Authored: Tue Nov 1 17:48:37 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Tue Nov 1 17:56:42 2016 +0800

----------------------------------------------------------------------
 build/bin/metastore.sh                          |   2 +-
 build/smoke-test/smoke-test.sh                  |   2 +-
 .../engine/mr/steps/MetadataCleanupJob.java     |   3 +
 kylin-it/pom.xml                                |   4 +
 .../kylin/provision/BuildCubeWithEngine.java    |   2 +-
 .../kylin/provision/BuildCubeWithStream.java    |   2 +-
 server-base/pom.xml                             |   4 +
 .../apache/kylin/rest/service/AdminService.java |   2 +-
 .../storage/hbase/util/CubeMigrationCLI.java    |   2 +
 .../hbase/util/CubeMigrationCheckCLI.java       |   2 +
 .../hbase/util/ExtendCubeToHybridCLI.java       |   4 +
 .../storage/hbase/util/HiveCmdBuilder.java      |   1 +
 .../storage/hbase/util/StorageCleanupJob.java   |   3 +
 .../org/apache/kylin/tool/CubeMigrationCLI.java | 586 +++++++++++++++++++
 .../kylin/tool/CubeMigrationCheckCLI.java       | 213 +++++++
 .../kylin/tool/ExtendCubeToHybridCLI.java       | 261 +++++++++
 .../apache/kylin/tool/MetadataCleanupJob.java   | 180 ++++++
 .../apache/kylin/tool/StorageCleanupJob.java    | 364 ++++++++++++
 18 files changed, 1632 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/build/bin/metastore.sh
----------------------------------------------------------------------
diff --git a/build/bin/metastore.sh b/build/bin/metastore.sh
index baf3d7a..a7a9e27 100755
--- a/build/bin/metastore.sh
+++ b/build/bin/metastore.sh
@@ -95,7 +95,7 @@ then
 elif [ "$1" == "clean" ]
 then
 
-    ${KYLIN_HOME}/bin/kylin.sh 
org.apache.kylin.engine.mr.steps.MetadataCleanupJob "${@:2}"
+    ${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.tool.MetadataCleanupJob  
"${@:2}"
 
 else
     echo "usage: metastore.sh backup"

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/build/smoke-test/smoke-test.sh
----------------------------------------------------------------------
diff --git a/build/smoke-test/smoke-test.sh b/build/smoke-test/smoke-test.sh
index c21bd6d..f174b47 100755
--- a/build/smoke-test/smoke-test.sh
+++ b/build/smoke-test/smoke-test.sh
@@ -72,6 +72,6 @@ cd -
 
 # Tear down stage
 ${KYLIN_HOME}/bin/metastore.sh clean --delete true
-${KYLIN_HOME}/bin/kylin.sh 
org.apache.kylin.storage.hbase.util.StorageCleanupJob --delete true
+${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.tool.StorageCleanupJob --delete 
true
 ${KYLIN_HOME}/bin/metastore.sh reset
 ${KYLIN_HOME}/bin/kylin.sh stop

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
index 962697e..2dcef20 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
@@ -43,6 +43,7 @@ import com.google.common.collect.Sets;
 
 /**
  */
+@Deprecated
 public class MetadataCleanupJob extends AbstractHadoopJob {
 
     @SuppressWarnings("static-access")
@@ -174,6 +175,8 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
     }
 
     public static void main(String[] args) throws Exception {
+        logger.warn("org.apache.kylin.engine.mr.steps.MetadataCleanupJob is 
deprecated, use org.apache.kylin.tool.MetadataCleanupJob instead");
+
         int exitCode = ToolRunner.run(new MetadataCleanupJob(), args);
         System.exit(exitCode);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 43e47c9..3bde97c 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -61,6 +61,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-tool</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-storage-hbase</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 3d60a3c..2b1a917 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -56,8 +56,8 @@ import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
-import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
 import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.apache.kylin.tool.StorageCleanupJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 9490560..4aec310 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -37,7 +37,7 @@ import org.apache.kylin.job.streaming.KafkaDataLoader;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
-import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
+import org.apache.kylin.tool.StorageCleanupJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/server-base/pom.xml
----------------------------------------------------------------------
diff --git a/server-base/pom.xml b/server-base/pom.xml
index 1302051..caab63c 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -62,6 +62,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-source-kafka</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-tool</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>net.sf.ehcache</groupId>

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java
index 1a94967..ace0388 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java
@@ -28,7 +28,7 @@ import 
org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
-import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
+import org.apache.kylin.tool.StorageCleanupJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.security.access.prepost.PreAuthorize;

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 52aa7ea..dcf1690 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -76,6 +76,7 @@ import org.slf4j.LoggerFactory;
  * Note that different envs are assumed to share the same hadoop cluster,
  * including hdfs, hbase and hive.
  */
+@Deprecated
 public class CubeMigrationCLI {
 
     private static final Logger logger = 
LoggerFactory.getLogger(CubeMigrationCLI.class);
@@ -93,6 +94,7 @@ public class CubeMigrationCLI {
     private static final String ACL_INFO_FAMILY_PARENT_COLUMN = "p";
 
     public static void main(String[] args) throws IOException, 
InterruptedException {
+        logger.warn("org.apache.kylin.storage.hbase.util.CubeMigrationCLI is 
deprecated, use org.apache.kylin.tool.CubeMigrationCLI instead");
 
         if (args.length != 8) {
             usage();

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java
index 295750a..8bd4abf 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java
@@ -49,6 +49,7 @@ import com.google.common.collect.Lists;
  * for all of cube segments' corresponding HTables after migrating a cube
  * <p/>
  */
+@Deprecated
 public class CubeMigrationCheckCLI {
 
     private static final Logger logger = 
LoggerFactory.getLogger(CubeMigrationCheckCLI.class);
@@ -68,6 +69,7 @@ public class CubeMigrationCheckCLI {
     private boolean ifFix = false;
 
     public static void main(String[] args) throws ParseException, IOException {
+        logger.warn("org.apache.kylin.storage.hbase.util.CubeMigrationCheckCLI 
is deprecated, use org.apache.kylin.tool.CubeMigrationCheckCLI instead");
 
         OptionsHelper optionsHelper = new OptionsHelper();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
index a5a85fa..89063e6 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
@@ -60,7 +60,9 @@ import com.google.common.collect.Lists;
 
 /**
  * Created by dongli on 12/29/15.
+ * 
  */
+@Deprecated
 public class ExtendCubeToHybridCLI {
     public static final String ACL_INFO_FAMILY = "i";
     private static final String CUBE_POSTFIX = "_old";
@@ -83,6 +85,8 @@ public class ExtendCubeToHybridCLI {
     }
 
     public static void main(String[] args) throws Exception {
+        logger.warn("org.apache.kylin.storage.hbase.util.ExtendCubeToHybridCLI 
is deprecated, use org.apache.kylin.tool.ExtendCubeToHybridCLI instead");
+
         if (args.length != 2 && args.length != 3) {
             System.out.println("Usage: ExtendCubeToHybridCLI project cube 
[partition_date]");
             return;

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HiveCmdBuilder.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HiveCmdBuilder.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HiveCmdBuilder.java
index c435f34..9252254 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HiveCmdBuilder.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HiveCmdBuilder.java
@@ -34,6 +34,7 @@ import com.google.common.collect.Lists;
 /**
  * Created by dongli on 2/29/16.
  */
+@Deprecated
 public class HiveCmdBuilder {
     private static final Logger logger = 
LoggerFactory.getLogger(HiveCmdBuilder.class);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index c91de7b..9ecc42d 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -59,6 +59,7 @@ import 
org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Deprecated
 public class StorageCleanupJob extends AbstractApplication {
 
     @SuppressWarnings("static-access")
@@ -356,6 +357,8 @@ public class StorageCleanupJob extends AbstractApplication {
     }
 
     public static void main(String[] args) throws Exception {
+        logger.warn("org.apache.kylin.storage.hbase.util.StorageCleanupJob is 
deprecated, use org.apache.kylin.tool.StorageCleanupJob instead");
+
         StorageCleanupJob cli = new StorageCleanupJob();
         cli.execute(args);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
new file mode 100644
index 0000000..46f8d75
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p/>
+ * This tool serves for the purpose of migrating cubes. e.g. upgrade cube from
+ * dev env to test(prod) env, or vice versa.
+ * <p/>
+ * Note that different envs are assumed to share the same hadoop cluster,
+ * including hdfs, hbase and hive.
+ */
+public class CubeMigrationCLI {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CubeMigrationCLI.class);
+
+    private static List<Opt> operations;
+    private static KylinConfig srcConfig;
+    private static KylinConfig dstConfig;
+    private static ResourceStore srcStore;
+    private static ResourceStore dstStore;
+    private static FileSystem hdfsFS;
+    private static HBaseAdmin hbaseAdmin;
+
+    public static final String ACL_INFO_FAMILY = "i";
+    private static final String ACL_TABLE_NAME = "_acl";
+    private static final String ACL_INFO_FAMILY_PARENT_COLUMN = "p";
+
+    public static void main(String[] args) throws IOException, 
InterruptedException {
+
+        if (args.length != 8) {
+            usage();
+            System.exit(1);
+        }
+
+        moveCube(args[0], args[1], args[2], args[3], args[4], args[5], 
args[6], args[7]);
+    }
+
+    private static void usage() {
+        System.out.println("Usage: CubeMigrationCLI srcKylinConfigUri 
dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot 
overwriteIfExists realExecute");
+        System.out.println(" srcKylinConfigUri: The KylinConfig of the 
cube’s source \n" + "dstKylinConfigUri: The KylinConfig of the cube’s new 
home \n" + "cubeName: the name of cube to be migrated. \n" + "projectName: The 
target project in the target environment.(Make sure it exist) \n" + 
"copyAclOrNot: true or false: whether copy cube ACL to target environment. \n" 
+ "purgeOrNot: true or false: whether purge the cube from src server after the 
migration. \n" + "overwriteIfExists: overwrite cube if it already exists in the 
target environment. \n" + "realExecute: if false, just print the operations to 
take, if true, do the real migration. \n");
+
+    }
+
+    public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String 
cubeName, String projectName, String copyAcl, String purgeAndDisable, String 
overwriteIfExists, String realExecute) throws IOException, InterruptedException 
{
+
+        srcConfig = srcCfg;
+        srcStore = ResourceStore.getStore(srcConfig);
+        dstConfig = dstCfg;
+        dstStore = ResourceStore.getStore(dstConfig);
+
+        CubeManager cubeManager = CubeManager.getInstance(srcConfig);
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        logger.info("cube to be moved is : " + cubeName);
+
+        if (cube.getStatus() != RealizationStatusEnum.READY)
+            throw new IllegalStateException("Cannot migrate cube that is not 
in READY state.");
+
+        for (CubeSegment segment : cube.getSegments()) {
+            if (segment.getStatus() != SegmentStatusEnum.READY) {
+                throw new IllegalStateException("At least one segment is not 
in READY state");
+            }
+        }
+
+        checkAndGetHbaseUrl();
+
+        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+        hbaseAdmin = new HBaseAdmin(conf);
+
+        hdfsFS = FileSystem.get(new Configuration());
+
+        operations = new ArrayList<Opt>();
+
+        copyFilesInMetaStore(cube, overwriteIfExists);
+        renameFoldersInHdfs(cube);
+        changeHtableHost(cube);
+        addCubeAndModelIntoProject(cube, cubeName, projectName);
+        if (Boolean.parseBoolean(copyAcl) == true) {
+            copyACL(cube, projectName);
+        }
+
+        if (Boolean.parseBoolean(purgeAndDisable) == true) {
+            purgeAndDisable(cubeName); // this should be the last action
+        }
+
+        if (realExecute.equalsIgnoreCase("true")) {
+            doOpts();
+            checkMigrationSuccess(dstConfig, cubeName, true);
+            updateMeta(dstConfig);
+        } else {
+            showOpts();
+        }
+    }
+
+    public static void moveCube(String srcCfgUri, String dstCfgUri, String 
cubeName, String projectName, String copyAcl, String purgeAndDisable, String 
overwriteIfExists, String realExecute) throws IOException, InterruptedException 
{
+
+        moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), 
KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, projectName, copyAcl, 
purgeAndDisable, overwriteIfExists, realExecute);
+    }
+
+    public static void checkMigrationSuccess(KylinConfig kylinConfig, String 
cubeName, Boolean ifFix) throws IOException {
+        CubeMigrationCheckCLI checkCLI = new 
CubeMigrationCheckCLI(kylinConfig, ifFix);
+        checkCLI.execute(cubeName);
+    }
+
+    private static String checkAndGetHbaseUrl() {
+        String srcMetadataUrl = srcConfig.getMetadataUrl();
+        String dstMetadataUrl = dstConfig.getMetadataUrl();
+
+        logger.info("src metadata url is " + srcMetadataUrl);
+        logger.info("dst metadata url is " + dstMetadataUrl);
+
+        int srcIndex = srcMetadataUrl.toLowerCase().indexOf("hbase");
+        int dstIndex = dstMetadataUrl.toLowerCase().indexOf("hbase");
+        if (srcIndex < 0 || dstIndex < 0)
+            throw new IllegalStateException("Both metadata urls should be 
hbase metadata url");
+
+        String srcHbaseUrl = srcMetadataUrl.substring(srcIndex).trim();
+        String dstHbaseUrl = dstMetadataUrl.substring(dstIndex).trim();
+        if (!srcHbaseUrl.equalsIgnoreCase(dstHbaseUrl)) {
+            throw new IllegalStateException("hbase url not equal! ");
+        }
+
+        logger.info("hbase url is " + srcHbaseUrl.trim());
+        return srcHbaseUrl.trim();
+    }
+
+    private static void renameFoldersInHdfs(CubeInstance cube) {
+        for (CubeSegment segment : cube.getSegments()) {
+
+            String jobUuid = segment.getLastBuildJobID();
+            String src = 
JobBuilderSupport.getJobWorkingDir(srcConfig.getHdfsWorkingDirectory(), 
jobUuid);
+            String tgt = 
JobBuilderSupport.getJobWorkingDir(dstConfig.getHdfsWorkingDirectory(), 
jobUuid);
+
+            operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[] 
{ src, tgt }));
+        }
+
+    }
+
+    private static void changeHtableHost(CubeInstance cube) {
+        for (CubeSegment segment : cube.getSegments()) {
+            operations.add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { 
segment.getStorageLocationIdentifier() }));
+        }
+    }
+
+    private static void copyACL(CubeInstance cube, String projectName) {
+        operations.add(new Opt(OptType.COPY_ACL, new Object[] { 
cube.getUuid(), cube.getDescriptor().getModel().getUuid(), projectName }));
+    }
+
+    private static void copyFilesInMetaStore(CubeInstance cube, String 
overwriteIfExists) throws IOException {
+
+        List<String> metaItems = new ArrayList<String>();
+        Set<String> dictAndSnapshot = new HashSet<String>();
+        listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
+
+        if (dstStore.exists(cube.getResourcePath()) && 
!overwriteIfExists.equalsIgnoreCase("true"))
+            throw new IllegalStateException("The cube named " + cube.getName() 
+ " already exists on target metadata store. Use overwriteIfExists to overwrite 
it");
+
+        for (String item : metaItems) {
+            operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { 
item }));
+        }
+
+        for (String item : dictAndSnapshot) {
+            operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] 
{ item, cube.getName() }));
+        }
+    }
+    private static void addCubeAndModelIntoProject(CubeInstance srcCube, 
String cubeName, String projectName) throws IOException {
+        String projectResPath = 
ProjectInstance.concatResourcePath(projectName);
+        if (!dstStore.exists(projectResPath))
+            throw new IllegalStateException("The target project " + 
projectName + "does not exist");
+
+        operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { 
srcCube, cubeName, projectName }));
+    }
+
+    private static void purgeAndDisable(String cubeName) throws IOException {
+        operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { 
cubeName }));
+    }
+
+    private static void listCubeRelatedResources(CubeInstance cube, 
List<String> metaResource, Set<String> dictAndSnapshot) throws IOException {
+
+        CubeDesc cubeDesc = cube.getDescriptor();
+        metaResource.add(cube.getResourcePath());
+        metaResource.add(cubeDesc.getResourcePath());
+        
metaResource.add(DataModelDesc.concatResourcePath(cubeDesc.getModelName()));
+
+        for (String table : cubeDesc.getModel().getAllTables()) {
+            
metaResource.add(TableDesc.concatResourcePath(table.toUpperCase()));
+        }
+
+        for (CubeSegment segment : cube.getSegments()) {
+            metaResource.add(segment.getStatisticsResourcePath());
+            dictAndSnapshot.addAll(segment.getSnapshotPaths());
+            dictAndSnapshot.addAll(segment.getDictionaryPaths());
+        }
+    }
+
+    private static enum OptType {
+        COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, 
ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, COPY_ACL, PURGE_AND_DISABLE
+    }
+
+    private static class Opt {
+        private OptType type;
+        private Object[] params;
+
+        private Opt(OptType type, Object[] params) {
+            this.type = type;
+            this.params = params;
+        }
+
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append(type).append(":");
+            for (Object s : params)
+                sb.append(s).append(", ");
+            return sb.toString();
+        }
+
+    }
+
+    private static void showOpts() {
+        for (int i = 0; i < operations.size(); ++i) {
+            showOpt(operations.get(i));
+        }
+    }
+
+    private static void showOpt(Opt opt) {
+        logger.info("Operation: " + opt.toString());
+    }
+
+    private static void doOpts() throws IOException, InterruptedException {
+        int index = 0;
+        try {
+            for (; index < operations.size(); ++index) {
+                logger.info("Operation index :" + index);
+                doOpt(operations.get(index));
+            }
+        } catch (Exception e) {
+            logger.error("error met", e);
+            logger.info("Try undoing previous changes");
+            // undo:
+            for (int i = index; i >= 0; --i) {
+                try {
+                    undo(operations.get(i));
+                } catch (Exception ee) {
+                    logger.error("error met ", e);
+                    logger.info("Continue undoing...");
+                }
+            }
+
+            throw new RuntimeException("Cube moving failed");
+        }
+    }
+
+    @SuppressWarnings("checkstyle:methodlength")
+    private static void doOpt(Opt opt) throws IOException, 
InterruptedException {
+        logger.info("Executing operation: " + opt.toString());
+
+        switch (opt.type) {
+        case CHANGE_HTABLE_HOST: {
+            String tableName = (String) opt.params[0];
+            HTableDescriptor desc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            hbaseAdmin.disableTable(tableName);
+            desc.setValue(IRealizationConstants.HTableTag, 
dstConfig.getMetadataUrlPrefix());
+            hbaseAdmin.modifyTable(tableName, desc);
+            hbaseAdmin.enableTable(tableName);
+            logger.info("CHANGE_HTABLE_HOST is completed");
+            break;
+        }
+        case COPY_FILE_IN_META: {
+            String item = (String) opt.params[0];
+            RawResource res = srcStore.getResource(item);
+            dstStore.putResource(item, res.inputStream, res.timestamp);
+            res.inputStream.close();
+            logger.info("Item " + item + " is copied");
+            break;
+        }
+        case COPY_DICT_OR_SNAPSHOT: {
+            String item = (String) opt.params[0];
+
+            if (item.toLowerCase().endsWith(".dict")) {
+                DictionaryManager dstDictMgr = 
DictionaryManager.getInstance(dstConfig);
+                DictionaryManager srcDicMgr = 
DictionaryManager.getInstance(srcConfig);
+                DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item);
+
+                long ts = dictSrc.getLastModified();
+                dictSrc.setLastModified(0);//to avoid resource store write 
conflict
+                Dictionary dictObj = 
dictSrc.getDictionaryObject().copyToAnotherMeta(srcConfig, dstConfig);
+                DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictObj, 
dictSrc);
+                dictSrc.setLastModified(ts);
+
+                if (dictSaved == dictSrc) {
+                    //no dup found, already saved to dest
+                    logger.info("Item " + item + " is copied");
+                } else {
+                    //dictSrc is rejected because of duplication
+                    //modify cube's dictionary path
+                    String cubeName = (String) opt.params[1];
+                    String cubeResPath = 
CubeInstance.concatResourcePath(cubeName);
+                    Serializer<CubeInstance> cubeSerializer = new 
JsonSerializer<CubeInstance>(CubeInstance.class);
+                    CubeInstance cube = dstStore.getResource(cubeResPath, 
CubeInstance.class, cubeSerializer);
+                    for (CubeSegment segment : cube.getSegments()) {
+                        for (Map.Entry<String, String> entry : 
segment.getDictionaries().entrySet()) {
+                            if (entry.getValue().equalsIgnoreCase(item)) {
+                                entry.setValue(dictSaved.getResourcePath());
+                            }
+                        }
+                    }
+                    dstStore.putResource(cubeResPath, cube, cubeSerializer);
+                    logger.info("Item " + item + " is dup, instead " + 
dictSaved.getResourcePath() + " is reused");
+                }
+
+            } else if (item.toLowerCase().endsWith(".snapshot")) {
+                SnapshotManager dstSnapMgr = 
SnapshotManager.getInstance(dstConfig);
+                SnapshotManager srcSnapMgr = 
SnapshotManager.getInstance(srcConfig);
+                SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item);
+
+                long ts = snapSrc.getLastModified();
+                snapSrc.setLastModified(0);
+                SnapshotTable snapSaved = 
dstSnapMgr.trySaveNewSnapshot(snapSrc);
+                snapSrc.setLastModified(ts);
+
+                if (snapSaved == snapSrc) {
+                    //no dup found, already saved to dest
+                    logger.info("Item " + item + " is copied");
+
+                } else {
+                    String cubeName = (String) opt.params[1];
+                    String cubeResPath = 
CubeInstance.concatResourcePath(cubeName);
+                    Serializer<CubeInstance> cubeSerializer = new 
JsonSerializer<CubeInstance>(CubeInstance.class);
+                    CubeInstance cube = dstStore.getResource(cubeResPath, 
CubeInstance.class, cubeSerializer);
+                    for (CubeSegment segment : cube.getSegments()) {
+                        for (Map.Entry<String, String> entry : 
segment.getSnapshots().entrySet()) {
+                            if (entry.getValue().equalsIgnoreCase(item)) {
+                                entry.setValue(snapSaved.getResourcePath());
+                            }
+                        }
+                    }
+                    dstStore.putResource(cubeResPath, cube, cubeSerializer);
+                    logger.info("Item " + item + " is dup, instead " + 
snapSaved.getResourcePath() + " is reused");
+
+                }
+
+            } else {
+                logger.error("unknown item found: " + item);
+                logger.info("ignore it");
+            }
+
+            break;
+        }
+        case RENAME_FOLDER_IN_HDFS: {
+            String srcPath = (String) opt.params[0];
+            String dstPath = (String) opt.params[1];
+            renameHDFSPath(srcPath, dstPath);
+            logger.info("HDFS Folder renamed from " + srcPath + " to " + 
dstPath);
+            break;
+        }
+        case ADD_INTO_PROJECT: {
+            CubeInstance srcCube = (CubeInstance) opt.params[0];
+            String cubeName = (String) opt.params[1];
+            String projectName = (String) opt.params[2];
+            String modelName = srcCube.getDescriptor().getModelName();
+
+            String projectResPath = 
ProjectInstance.concatResourcePath(projectName);
+            Serializer<ProjectInstance> projectSerializer = new 
JsonSerializer<ProjectInstance>(ProjectInstance.class);
+            ProjectInstance project = dstStore.getResource(projectResPath, 
ProjectInstance.class, projectSerializer);
+
+            project.addModel(modelName);
+            project.removeRealization(RealizationType.CUBE, cubeName);
+            project.addRealizationEntry(RealizationType.CUBE, cubeName);
+
+            dstStore.putResource(projectResPath, project, projectSerializer);
+            logger.info("Project instance for " + projectName + " is 
corrected");
+            break;
+        }
+        case COPY_ACL: {
+            String cubeId = (String) opt.params[0];
+            String modelId = (String) opt.params[1];
+            String projectName = (String) opt.params[2];
+            String projectResPath = 
ProjectInstance.concatResourcePath(projectName);
+            Serializer<ProjectInstance> projectSerializer = new 
JsonSerializer<ProjectInstance>(ProjectInstance.class);
+            ProjectInstance project = dstStore.getResource(projectResPath, 
ProjectInstance.class, projectSerializer);
+            String projUUID = project.getUuid();
+            HTableInterface srcAclHtable = null;
+            HTableInterface destAclHtable = null;
+            try {
+                srcAclHtable = 
HBaseConnection.get(srcConfig.getStorageUrl()).getTable(srcConfig.getMetadataUrlPrefix()
 + ACL_TABLE_NAME);
+                destAclHtable = 
HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix()
 + ACL_TABLE_NAME);
+
+                // cube acl
+                Result result = srcAclHtable.get(new 
Get(Bytes.toBytes(cubeId)));
+                if (result.listCells() != null) {
+                    for (Cell cell : result.listCells()) {
+                        byte[] family = CellUtil.cloneFamily(cell);
+                        byte[] column = CellUtil.cloneQualifier(cell);
+                        byte[] value = CellUtil.cloneValue(cell);
+
+                        // use the target project uuid as the parent
+                        if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && 
Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) {
+                            String valueString = "{\"id\":\"" + projUUID + 
"\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}";
+                            value = Bytes.toBytes(valueString);
+                        }
+                        Put put = new Put(Bytes.toBytes(cubeId));
+                        put.add(family, column, value);
+                        destAclHtable.put(put);
+                    }
+                }
+                destAclHtable.flushCommits();
+            } finally {
+                IOUtils.closeQuietly(srcAclHtable);
+                IOUtils.closeQuietly(destAclHtable);
+            }
+            break;
+        }
+        case PURGE_AND_DISABLE: {
+            String cubeName = (String) opt.params[0];
+            String cubeResPath = CubeInstance.concatResourcePath(cubeName);
+            Serializer<CubeInstance> cubeSerializer = new 
JsonSerializer<CubeInstance>(CubeInstance.class);
+            CubeInstance cube = srcStore.getResource(cubeResPath, 
CubeInstance.class, cubeSerializer);
+            cube.getSegments().clear();
+            cube.setStatus(RealizationStatusEnum.DISABLED);
+            srcStore.putResource(cubeResPath, cube, cubeSerializer);
+            logger.info("Cube " + cubeName + " is purged and disabled in " + 
srcConfig.getMetadataUrl());
+
+            break;
+        }
+        default: {
+            //do nothing
+            break;
+        }
+        }
+    }
+
+    private static void undo(Opt opt) throws IOException, InterruptedException 
{
+        logger.info("Undo operation: " + opt.toString());
+
+        switch (opt.type) {
+        case CHANGE_HTABLE_HOST: {
+            String tableName = (String) opt.params[0];
+            HTableDescriptor desc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            hbaseAdmin.disableTable(tableName);
+            desc.setValue(IRealizationConstants.HTableTag, 
srcConfig.getMetadataUrlPrefix());
+            hbaseAdmin.modifyTable(tableName, desc);
+            hbaseAdmin.enableTable(tableName);
+            break;
+        }
+        case COPY_FILE_IN_META: {
+            // no harm
+            logger.info("Undo for COPY_FILE_IN_META is ignored");
+            break;
+        }
+        case COPY_DICT_OR_SNAPSHOT: {
+            // no harm
+            logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
+            break;
+        }
+        case RENAME_FOLDER_IN_HDFS: {
+            String srcPath = (String) opt.params[1];
+            String dstPath = (String) opt.params[0];
+
+            if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new 
Path(dstPath))) {
+                renameHDFSPath(srcPath, dstPath);
+                logger.info("HDFS Folder renamed from " + srcPath + " to " + 
dstPath);
+            }
+            break;
+        }
+        case ADD_INTO_PROJECT: {
+            logger.info("Undo for ADD_INTO_PROJECT is ignored");
+            break;
+        }
+        case COPY_ACL: {
+            String cubeId = (String) opt.params[0];
+            String modelId = (String) opt.params[1];
+            HTableInterface destAclHtable = null;
+            try {
+                destAclHtable = 
HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix()
 + ACL_TABLE_NAME);
+
+                destAclHtable.delete(new Delete(Bytes.toBytes(cubeId)));
+                destAclHtable.delete(new Delete(Bytes.toBytes(modelId)));
+                destAclHtable.flushCommits();
+            } finally {
+                IOUtils.closeQuietly(destAclHtable);
+            }
+            break;
+        }
+        case PURGE_AND_DISABLE: {
+            logger.info("Undo for PURGE_AND_DISABLE is not supported");
+            break;
+        }
+        default: {
+            //do nothing
+            break;
+        }
+        }
+    }
+
+    private static void updateMeta(KylinConfig config){
+        String[] nodes = config.getRestServers();
+        for (String node : nodes) {
+            RestClient restClient = new RestClient(node);
+            try {
+                logger.info("update meta cache for " + node);
+                restClient.wipeCache(Broadcaster.SYNC_ALL, 
Broadcaster.Event.UPDATE.getType(), Broadcaster.SYNC_ALL);
+            } catch (IOException e) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+
+    private static void renameHDFSPath(String srcPath, String dstPath) throws 
IOException, InterruptedException {
+        int nRetry = 0;
+        int sleepTime = 5000;
+        while (!hdfsFS.rename(new Path(srcPath), new Path(dstPath))) {
+            ++nRetry;
+            if (nRetry > 3) {
+                throw new InterruptedException("Cannot rename folder " + 
srcPath + " to folder " + dstPath);
+            } else {
+                Thread.sleep(sleepTime * nRetry * nRetry);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java
----------------------------------------------------------------------
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java
new file mode 100644
index 0000000..fe348ba
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * <p/>
+ * This tool serves for the purpose of
+ * checking the "KYLIN_HOST" property to be consistent with the dst's 
MetadataUrlPrefix
+ * for all of cube segments' corresponding HTables after migrating a cube
+ * <p/>
+ */
+public class CubeMigrationCheckCLI {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CubeMigrationCheckCLI.class);
+
+    private static final Option OPTION_FIX = 
OptionBuilder.withArgName("fix").hasArg().isRequired(false).withDescription("Fix
 the inconsistent cube segments' HOST").create("fix");
+
+    private static final Option OPTION_DST_CFG_URI = 
OptionBuilder.withArgName("dstCfgUri").hasArg().isRequired(false).withDescription("The
 KylinConfig of the cube’s new home").create("dstCfgUri");
+
+    private static final Option OPTION_CUBE = 
OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("The
 name of cube migrated").create("cube");
+
+    private KylinConfig dstCfg;
+    private HBaseAdmin hbaseAdmin;
+
+    private List<String> issueExistHTables;
+    private List<String> inconsistentHTables;
+
+    private boolean ifFix = false;
+
+    public static void main(String[] args) throws ParseException, IOException {
+
+        OptionsHelper optionsHelper = new OptionsHelper();
+
+        Options options = new Options();
+        options.addOption(OPTION_FIX);
+        options.addOption(OPTION_DST_CFG_URI);
+        options.addOption(OPTION_CUBE);
+
+        boolean ifFix = false;
+        String dstCfgUri;
+        String cubeName;
+        logger.info("jobs args: " + Arrays.toString(args));
+        try {
+
+            optionsHelper.parseOptions(options, args);
+
+            logger.info("options: '" + options.toString() + "'");
+            logger.info("option value 'fix': '" + 
optionsHelper.getOptionValue(OPTION_FIX) + "'");
+            ifFix = 
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_FIX));
+
+            logger.info("option value 'dstCfgUri': '" + 
optionsHelper.getOptionValue(OPTION_DST_CFG_URI) + "'");
+            dstCfgUri = optionsHelper.getOptionValue(OPTION_DST_CFG_URI);
+
+            logger.info("option value 'cube': '" + 
optionsHelper.getOptionValue(OPTION_CUBE) + "'");
+            cubeName = optionsHelper.getOptionValue(OPTION_CUBE);
+
+        } catch (ParseException e) {
+            optionsHelper.printUsage(CubeMigrationCheckCLI.class.getName(), 
options);
+            throw e;
+        }
+
+        KylinConfig kylinConfig;
+        if (dstCfgUri == null) {
+            kylinConfig = KylinConfig.getInstanceFromEnv();
+        } else {
+            kylinConfig = KylinConfig.createInstanceFromUri(dstCfgUri);
+        }
+
+        CubeMigrationCheckCLI checkCLI = new 
CubeMigrationCheckCLI(kylinConfig, ifFix);
+        checkCLI.execute(cubeName);
+    }
+
+    public void execute() throws IOException {
+        execute(null);
+    }
+
+    public void execute(String cubeName) throws IOException {
+        if (cubeName == null) {
+            checkAll();
+        } else {
+            checkCube(cubeName);
+        }
+        fixInconsistent();
+        printIssueExistingHTables();
+    }
+
+    public CubeMigrationCheckCLI(KylinConfig kylinConfig, Boolean isFix) 
throws IOException {
+        this.dstCfg = kylinConfig;
+        this.ifFix = isFix;
+
+        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+        hbaseAdmin = new HBaseAdmin(conf);
+
+        issueExistHTables = Lists.newArrayList();
+        inconsistentHTables = Lists.newArrayList();
+    }
+
+    public void checkCube(String cubeName) {
+        List<String> segFullNameList = Lists.newArrayList();
+
+        CubeInstance cube = CubeManager.getInstance(dstCfg).getCube(cubeName);
+        addHTableNamesForCube(cube, segFullNameList);
+
+        check(segFullNameList);
+    }
+
+    public void checkAll() {
+        List<String> segFullNameList = Lists.newArrayList();
+
+        CubeManager cubeMgr = CubeManager.getInstance(dstCfg);
+        for (CubeInstance cube : cubeMgr.listAllCubes()) {
+            addHTableNamesForCube(cube, segFullNameList);
+        }
+
+        check(segFullNameList);
+    }
+
+    public void addHTableNamesForCube(CubeInstance cube, List<String> 
segFullNameList) {
+        for (CubeSegment seg : cube.getSegments()) {
+            String tableName = seg.getStorageLocationIdentifier();
+            segFullNameList.add(tableName + "," + cube.getName());
+        }
+    }
+
+    public void check(List<String> segFullNameList) {
+        issueExistHTables = Lists.newArrayList();
+        inconsistentHTables = Lists.newArrayList();
+
+        for (String segFullName : segFullNameList) {
+            String[] sepNameList = segFullName.split(",");
+            try {
+                HTableDescriptor hTableDescriptor = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0]));
+                String host = 
hTableDescriptor.getValue(IRealizationConstants.HTableTag);
+                if (!dstCfg.getMetadataUrlPrefix().equalsIgnoreCase(host)) {
+                    inconsistentHTables.add(segFullName);
+                }
+            } catch (IOException e) {
+                issueExistHTables.add(segFullName);
+                continue;
+            }
+        }
+    }
+
+    public void fixInconsistent() throws IOException {
+        if (ifFix == true) {
+            for (String segFullName : inconsistentHTables) {
+                String[] sepNameList = segFullName.split(",");
+                HTableDescriptor desc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0]));
+                logger.info("Change the host of htable " + sepNameList[0] + 
"belonging to cube " + sepNameList[1] + " from " + 
desc.getValue(IRealizationConstants.HTableTag) + " to " + 
dstCfg.getMetadataUrlPrefix());
+                hbaseAdmin.disableTable(sepNameList[0]);
+                desc.setValue(IRealizationConstants.HTableTag, 
dstCfg.getMetadataUrlPrefix());
+                hbaseAdmin.modifyTable(sepNameList[0], desc);
+                hbaseAdmin.enableTable(sepNameList[0]);
+            }
+        } else {
+            logger.info("------ Inconsistent HTables Needed To Be Fixed 
------");
+            for (String hTable : inconsistentHTables) {
+                String[] sepNameList = hTable.split(",");
+                logger.info(sepNameList[0] + " belonging to cube " + 
sepNameList[1]);
+            }
+            
logger.info("----------------------------------------------------");
+        }
+    }
+
+    public void printIssueExistingHTables() {
+        logger.info("------ HTables exist issues in hbase : not existing, 
metadata broken ------");
+        for (String segFullName : issueExistHTables) {
+            String[] sepNameList = segFullName.split(",");
+            logger.error(sepNameList[0] + " belonging to cube " + 
sepNameList[1] + " has some issues and cannot be read successfully!!!");
+        }
+        logger.info("----------------------------------------------------");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
----------------------------------------------------------------------
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
new file mode 100644
index 0000000..27fa973
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by dongli on 12/29/15.
+ */
+public class ExtendCubeToHybridCLI {
+    public static final String ACL_INFO_FAMILY = "i";
+    private static final String CUBE_POSTFIX = "_old";
+    private static final String HYBRID_POSTFIX = "_hybrid";
+    private static final Logger logger = 
LoggerFactory.getLogger(ExtendCubeToHybridCLI.class);
+    private static final String ACL_INFO_FAMILY_PARENT_COLUMN = "p";
+
+    private KylinConfig kylinConfig;
+    private CubeManager cubeManager;
+    private CubeDescManager cubeDescManager;
+    private MetadataManager metadataManager;
+    private ResourceStore store;
+
+    public ExtendCubeToHybridCLI() {
+        this.kylinConfig = KylinConfig.getInstanceFromEnv();
+        this.store = ResourceStore.getStore(kylinConfig);
+        this.cubeManager = CubeManager.getInstance(kylinConfig);
+        this.cubeDescManager = CubeDescManager.getInstance(kylinConfig);
+        this.metadataManager = MetadataManager.getInstance(kylinConfig);
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 2 && args.length != 3) {
+            System.out.println("Usage: ExtendCubeToHybridCLI project cube 
[partition_date]");
+            return;
+        }
+
+        ExtendCubeToHybridCLI tool = new ExtendCubeToHybridCLI();
+
+        String projectName = args[0];
+        String cubeName = args[1];
+        String partitionDate = args.length == 3 ? args[2] : null;
+
+        try {
+            tool.createFromCube(projectName, cubeName, partitionDate);
+            tool.verify();
+            logger.info("Job Finished.");
+        } catch (Exception e) {
+            e.printStackTrace();
+            logger.error("Job Aborted.", e.getMessage());
+        }
+    }
+
+    private boolean validateCubeInstance(CubeInstance cubeInstance) {
+        if (cubeInstance == null) {
+            logger.error("This cube does not exist.");
+            return false;
+        }
+        if (cubeInstance.getSegments().isEmpty()) {
+            logger.error("No segments in this cube, no need to extend.");
+            return false;
+        }
+        return true;
+    }
+
+    public void createFromCube(String projectName, String cubeName, String 
partitionDateStr) throws Exception {
+        logger.info("Create hybrid for cube[" + cubeName + "], project[" + 
projectName + "], partition_date[" + partitionDateStr + "].");
+
+        CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+        if (!validateCubeInstance(cubeInstance)) {
+            return;
+        }
+
+        CubeDesc cubeDesc = 
cubeDescManager.getCubeDesc(cubeInstance.getDescName());
+        DataModelDesc dataModelDesc = 
metadataManager.getDataModelDesc(cubeDesc.getModelName());
+        if 
(StringUtils.isEmpty(dataModelDesc.getPartitionDesc().getPartitionDateColumn()))
 {
+            logger.error("No incremental cube, no need to extend.");
+            return;
+        }
+
+        String owner = cubeInstance.getOwner();
+        long partitionDate = partitionDateStr != null ? 
DateFormat.stringToMillis(partitionDateStr) : 0;
+
+        // get new name for old cube and cube_desc
+        String newCubeDescName = renameCube(cubeDesc.getName());
+        String newCubeInstanceName = renameCube(cubeInstance.getName());
+        while (cubeDescManager.getCubeDesc(newCubeDescName) != null)
+            newCubeDescName = renameCube(newCubeDescName);
+        while (cubeManager.getCube(newCubeInstanceName) != null)
+            newCubeInstanceName = renameCube(newCubeInstanceName);
+
+        // create new cube_instance for old segments
+        CubeInstance newCubeInstance = CubeInstance.getCopyOf(cubeInstance);
+        newCubeInstance.setName(newCubeInstanceName);
+        newCubeInstance.setDescName(newCubeDescName);
+        newCubeInstance.updateRandomUuid();
+        Iterator<CubeSegment> segmentIterator = 
newCubeInstance.getSegments().iterator();
+        CubeSegment currentSeg = null;
+        while (segmentIterator.hasNext()) {
+            currentSeg = segmentIterator.next();
+            if (partitionDateStr != null && (currentSeg.getDateRangeStart() >= 
partitionDate || currentSeg.getDateRangeEnd() > partitionDate)) {
+                segmentIterator.remove();
+                logger.info("CubeSegment[" + currentSeg + "] was removed.");
+            }
+        }
+        if (partitionDateStr != null && partitionDate != 
currentSeg.getDateRangeEnd()) {
+            logger.error("PartitionDate must be end date of one segment.");
+            return;
+        }
+        if (currentSeg != null && partitionDateStr == null)
+            partitionDate = currentSeg.getDateRangeEnd();
+
+        cubeManager.createCube(newCubeInstance, projectName, owner);
+        logger.info("CubeInstance was saved at: " + 
newCubeInstance.getResourcePath());
+
+        // create new cube for old segments
+        CubeDesc newCubeDesc = CubeDesc.getCopyOf(cubeDesc);
+        newCubeDesc.setName(newCubeDescName);
+        newCubeDesc.updateRandomUuid();
+        newCubeDesc.init(kylinConfig, metadataManager.getAllTablesMap());
+        newCubeDesc.setPartitionDateEnd(partitionDate);
+        newCubeDesc.calculateSignature();
+        cubeDescManager.createCubeDesc(newCubeDesc);
+        logger.info("CubeDesc was saved at: " + newCubeDesc.getResourcePath());
+
+        // update old cube_desc to new-version metadata
+        cubeDesc.setPartitionDateStart(partitionDate);
+        cubeDesc.setEngineType(IEngineAware.ID_MR_V2);
+        cubeDesc.setStorageType(IStorageAware.ID_SHARDED_HBASE);
+        cubeDesc.calculateSignature();
+        cubeDescManager.updateCubeDesc(cubeDesc);
+        logger.info("CubeDesc was saved at: " + cubeDesc.getResourcePath());
+
+        // clear segments for old cube
+        cubeInstance.setSegments(new ArrayList<CubeSegment>());
+        cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
+        store.putResource(cubeInstance.getResourcePath(), cubeInstance, 
CubeManager.CUBE_SERIALIZER);
+        logger.info("CubeInstance was saved at: " + 
cubeInstance.getResourcePath());
+
+        // create hybrid model for these two cubes
+        List<RealizationEntry> realizationEntries = 
Lists.newArrayListWithCapacity(2);
+        realizationEntries.add(RealizationEntry.create(RealizationType.CUBE, 
cubeInstance.getName()));
+        realizationEntries.add(RealizationEntry.create(RealizationType.CUBE, 
newCubeInstance.getName()));
+        HybridInstance hybridInstance = HybridInstance.create(kylinConfig, 
renameHybrid(cubeInstance.getName()), realizationEntries);
+        store.putResource(hybridInstance.getResourcePath(), hybridInstance, 
HybridManager.HYBRID_SERIALIZER);
+        
ProjectManager.getInstance(kylinConfig).moveRealizationToProject(RealizationType.HYBRID,
 hybridInstance.getName(), projectName, owner);
+        logger.info("HybridInstance was saved at: " + 
hybridInstance.getResourcePath());
+
+        // copy Acl from old cube to new cube
+        copyAcl(cubeInstance.getId(), newCubeInstance.getId(), projectName);
+        logger.info("Acl copied from [" + cubeName + "] to [" + 
newCubeInstanceName + "].");
+    }
+
+    private void verify() {
+        CubeDescManager.clearCache();
+        CubeDescManager.getInstance(kylinConfig);
+
+        CubeManager.clearCache();
+        CubeManager.getInstance(kylinConfig);
+
+        ProjectManager.clearCache();
+        ProjectManager.getInstance(kylinConfig);
+
+        HybridManager.clearCache();
+        HybridManager.getInstance(kylinConfig);
+    }
+
+    private String renameCube(String origName) {
+        return origName + CUBE_POSTFIX;
+    }
+
+    private String renameHybrid(String origName) {
+        return origName + HYBRID_POSTFIX;
+    }
+
+    private void copyAcl(String origCubeId, String newCubeId, String 
projectName) throws Exception {
+        String projectResPath = 
ProjectInstance.concatResourcePath(projectName);
+        Serializer<ProjectInstance> projectSerializer = new 
JsonSerializer<ProjectInstance>(ProjectInstance.class);
+        ProjectInstance project = store.getResource(projectResPath, 
ProjectInstance.class, projectSerializer);
+        String projUUID = project.getUuid();
+        HTableInterface aclHtable = null;
+        try {
+            aclHtable = 
HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(kylinConfig.getMetadataUrlPrefix()
 + "_acl");
+
+            // cube acl
+            Result result = aclHtable.get(new Get(Bytes.toBytes(origCubeId)));
+            if (result.listCells() != null) {
+                for (Cell cell : result.listCells()) {
+                    byte[] family = CellUtil.cloneFamily(cell);
+                    byte[] column = CellUtil.cloneQualifier(cell);
+                    byte[] value = CellUtil.cloneValue(cell);
+
+                    // use the target project uuid as the parent
+                    if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && 
Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) {
+                        String valueString = "{\"id\":\"" + projUUID + 
"\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}";
+                        value = Bytes.toBytes(valueString);
+                    }
+                    Put put = new Put(Bytes.toBytes(newCubeId));
+                    put.add(family, column, value);
+                    aclHtable.put(put);
+                }
+            }
+            aclHtable.flushCommits();
+        } finally {
+            IOUtils.closeQuietly(aclHtable);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java 
b/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java
new file mode 100644
index 0000000..94962ff
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutableOutputPO;
+import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ */
+public class MetadataCleanupJob extends AbstractHadoopJob {
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_DELETE = 
OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete
 the unused metadata").create("delete");
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(MetadataCleanupJob.class);
+
+    boolean delete = false;
+
+    private KylinConfig config = null;
+
+    public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000L; // 2 
days
+    public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 
1000L; // 30 days
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
+     */
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        logger.info("jobs args: " + Arrays.toString(args));
+        options.addOption(OPTION_DELETE);
+        parseOptions(options, args);
+
+        logger.info("options: '" + getOptionsAsString() + "'");
+        logger.info("delete option value: '" + getOptionValue(OPTION_DELETE) + 
"'");
+        delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
+
+        config = KylinConfig.getInstanceFromEnv();
+
+        cleanup();
+
+        return 0;
+    }
+
+    private ResourceStore getStore() {
+        return ResourceStore.getStore(config);
+    }
+
+    private boolean isOlderThanThreshold(long resourceTime) {
+        long currentTime = System.currentTimeMillis();
+
+        if (currentTime - resourceTime > TIME_THREADSHOLD)
+            return true;
+        return false;
+    }
+
+    public void cleanup() throws Exception {
+        CubeManager cubeManager = CubeManager.getInstance(config);
+
+        List<String> toDeleteResource = Lists.newArrayList();
+
+        // two level resources, snapshot tables and cube statistics
+        for (String resourceRoot : new String[] { 
ResourceStore.SNAPSHOT_RESOURCE_ROOT, ResourceStore.CUBE_STATISTICS_ROOT }) {
+            NavigableSet<String> snapshotTables = 
getStore().listResources(resourceRoot);
+
+            if (snapshotTables != null) {
+                for (String snapshotTable : snapshotTables) {
+                    NavigableSet<String> snapshotNames = 
getStore().listResources(snapshotTable);
+                    if (snapshotNames != null)
+                        for (String snapshot : snapshotNames) {
+                            if 
(isOlderThanThreshold(getStore().getResourceTimestamp(snapshot)))
+                                toDeleteResource.add(snapshot);
+                        }
+                }
+            }
+        }
+
+        // three level resources, only dictionaries
+        NavigableSet<String> dictTables = 
getStore().listResources(ResourceStore.DICT_RESOURCE_ROOT);
+
+        if (dictTables != null) {
+            for (String table : dictTables) {
+                NavigableSet<String> tableColNames = 
getStore().listResources(table);
+                if (tableColNames != null)
+                    for (String tableCol : tableColNames) {
+                        NavigableSet<String> dictionaries = 
getStore().listResources(tableCol);
+                        if (dictionaries != null)
+                            for (String dict : dictionaries)
+                                if 
(isOlderThanThreshold(getStore().getResourceTimestamp(dict)))
+                                    toDeleteResource.add(dict);
+                    }
+            }
+        }
+
+        Set<String> activeResourceList = Sets.newHashSet();
+        for (org.apache.kylin.cube.CubeInstance cube : 
cubeManager.listAllCubes()) {
+            for (org.apache.kylin.cube.CubeSegment segment : 
cube.getSegments()) {
+                activeResourceList.addAll(segment.getSnapshotPaths());
+                activeResourceList.addAll(segment.getDictionaryPaths());
+                activeResourceList.add(segment.getStatisticsResourcePath());
+            }
+        }
+
+        toDeleteResource.removeAll(activeResourceList);
+
+        // delete old and completed jobs
+        ExecutableDao executableDao = 
ExecutableDao.getInstance(KylinConfig.getInstanceFromEnv());
+        List<ExecutablePO> allExecutable = executableDao.getJobs();
+        for (ExecutablePO executable : allExecutable) {
+            long lastModified = executable.getLastModified();
+            ExecutableOutputPO output = 
executableDao.getJobOutput(executable.getUuid());
+            if (System.currentTimeMillis() - lastModified > 
TIME_THREADSHOLD_FOR_JOB && 
(ExecutableState.SUCCEED.toString().equals(output.getStatus()) || 
ExecutableState.DISCARDED.toString().equals(output.getStatus()))) {
+                toDeleteResource.add(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" 
+ executable.getUuid());
+                
toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + 
executable.getUuid());
+
+                for (ExecutablePO task : executable.getTasks()) {
+                    
toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + 
task.getUuid());
+                }
+            }
+        }
+
+        if (toDeleteResource.size() > 0) {
+            logger.info("The following resources have no reference or is too 
old, will be cleaned from metadata store: \n");
+
+            for (String s : toDeleteResource) {
+                logger.info(s);
+                if (delete == true) {
+                    getStore().deleteResource(s);
+                }
+            }
+        } else {
+            logger.info("No resource to be cleaned up from metadata store;");
+        }
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new MetadataCleanupJob(), args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2fde806/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java 
b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
new file mode 100644
index 0000000..c1ff753
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.storage.hbase.util.HiveCmdBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StorageCleanupJob extends AbstractApplication {
+
+    @SuppressWarnings("static-access")
+    protected static final Option OPTION_DELETE = 
OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete
 the unused storage").create("delete");
+    protected static final Option OPTION_FORCE = 
OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning:
 will delete any intermediate hive tables").create("force");
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(StorageCleanupJob.class);
+    public static final int TIME_THRESHOLD_DELETE_HTABLE = 10; // Unit minute
+
+    protected boolean delete = false;
+    protected boolean force = false;
+    protected static ExecutableManager executableManager = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+    private void cleanUnusedHBaseTables(Configuration conf) throws IOException 
{
+        CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+        long TIME_THREADSHOLD = 
KylinConfig.getInstanceFromEnv().getStorageCleanupTimeThreshold();
+        // get all kylin hbase tables
+        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        String tableNamePrefix = 
IRealizationConstants.SharedHbaseStorageLocationPrefix;
+        HTableDescriptor[] tableDescriptors = 
hbaseAdmin.listTables(tableNamePrefix + ".*");
+        List<String> allTablesNeedToBeDropped = new ArrayList<String>();
+        for (HTableDescriptor desc : tableDescriptors) {
+            String host = desc.getValue(IRealizationConstants.HTableTag);
+            String creationTime = 
desc.getValue(IRealizationConstants.HTableCreationTime);
+            if 
(KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host))
 {
+                //only take care htables that belongs to self, and created 
more than 2 days
+                if (StringUtils.isEmpty(creationTime) || 
(System.currentTimeMillis() - Long.valueOf(creationTime) > TIME_THREADSHOLD)) {
+                    
allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
+                } else {
+                    logger.info("Exclude table " + 
desc.getTableName().getNameAsString() + " from drop list, as it is newly 
created");
+                }
+            }
+        }
+
+        // remove every segment htable from drop list
+        for (CubeInstance cube : cubeMgr.listAllCubes()) {
+            for (CubeSegment seg : cube.getSegments()) {
+                String tablename = seg.getStorageLocationIdentifier();
+                if (allTablesNeedToBeDropped.contains(tablename)) {
+                    allTablesNeedToBeDropped.remove(tablename);
+                    logger.info("Exclude table " + tablename + " from drop 
list, as the table belongs to cube " + cube.getName() + " with status " + 
cube.getStatus());
+                }
+            }
+        }
+
+        if (delete == true) {
+            // drop tables
+            ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+            for (String htableName : allTablesNeedToBeDropped) {
+                FutureTask futureTask = new FutureTask(new 
DeleteHTableRunnable(hbaseAdmin, htableName));
+                executorService.execute(futureTask);
+                try {
+                    futureTask.get(TIME_THRESHOLD_DELETE_HTABLE, 
TimeUnit.MINUTES);
+                } catch (TimeoutException e) {
+                    logger.warn("It fails to delete htable " + htableName + ", 
for it cost more than " + TIME_THRESHOLD_DELETE_HTABLE + " minutes!");
+                    futureTask.cancel(true);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    futureTask.cancel(true);
+                }
+            }
+            executorService.shutdown();
+        } else {
+            System.out.println("--------------- Tables To Be Dropped 
---------------");
+            for (String htableName : allTablesNeedToBeDropped) {
+                System.out.println(htableName);
+            }
+            
System.out.println("----------------------------------------------------");
+        }
+
+        hbaseAdmin.close();
+    }
+
+    @Override
+    protected Options getOptions() {
+        Options options = new Options();
+        options.addOption(OPTION_DELETE);
+        options.addOption(OPTION_FORCE);
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
+        logger.info("delete option value: '" + 
optionsHelper.getOptionValue(OPTION_DELETE) + "'");
+        logger.info("force option value: '" + 
optionsHelper.getOptionValue(OPTION_FORCE) + "'");
+        delete = 
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE));
+        force = 
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_FORCE));
+
+        Configuration conf = HBaseConfiguration.create();
+
+        cleanUnusedIntermediateHiveTable(conf);
+        cleanUnusedHdfsFiles(conf);
+        cleanUnusedHBaseTables(conf);
+
+    }
+
+    class DeleteHTableRunnable implements Callable {
+        HBaseAdmin hbaseAdmin;
+        String htableName;
+
+        DeleteHTableRunnable(HBaseAdmin hbaseAdmin, String htableName) {
+            this.hbaseAdmin = hbaseAdmin;
+            this.htableName = htableName;
+        }
+
+        public Object call() throws Exception {
+            logger.info("Deleting HBase table " + htableName);
+            if (hbaseAdmin.tableExists(htableName)) {
+                if (hbaseAdmin.isTableEnabled(htableName)) {
+                    hbaseAdmin.disableTable(htableName);
+                }
+
+                hbaseAdmin.deleteTable(htableName);
+                logger.info("Deleted HBase table " + htableName);
+            } else {
+                logger.info("HBase table" + htableName + " does not exist");
+            }
+            return null;
+        }
+    }
+
+    private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
+        JobEngineConfig engineConfig = new 
JobEngineConfig(KylinConfig.getInstanceFromEnv());
+        CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+        FileSystem fs = FileSystem.get(conf);
+        List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
+        // GlobFilter filter = new
+        // 
GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+        // + "/kylin-.*");
+        // TODO: when first use, /kylin/kylin_metadata does not exist.
+        FileStatus[] fStatus = fs.listStatus(new 
Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()));
+        for (FileStatus status : fStatus) {
+            String path = status.getPath().getName();
+            // System.out.println(path);
+            if (path.startsWith("kylin-")) {
+                String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + 
path;
+                allHdfsPathsNeedToBeDeleted.add(kylinJobPath);
+            }
+        }
+
+        List<String> allJobs = executableManager.getAllJobIds();
+        for (String jobId : allJobs) {
+            // only remove FINISHED and DISCARDED job intermediate files
+            final ExecutableState state = 
executableManager.getOutput(jobId).getState();
+            if (!state.isFinalState()) {
+                String path = 
JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), 
jobId);
+                allHdfsPathsNeedToBeDeleted.remove(path);
+                logger.info("Skip " + path + " from deletion list, as the path 
belongs to job " + jobId + " with status " + state);
+            }
+        }
+
+        // remove every segment working dir from deletion list
+        for (CubeInstance cube : cubeMgr.listAllCubes()) {
+            for (CubeSegment seg : cube.getSegments()) {
+                String jobUuid = seg.getLastBuildJobID();
+                if (jobUuid != null && jobUuid.equals("") == false) {
+                    String path = 
JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), 
jobUuid);
+                    allHdfsPathsNeedToBeDeleted.remove(path);
+                    logger.info("Skip " + path + " from deletion list, as the 
path belongs to segment " + seg + " of cube " + cube.getName());
+                }
+            }
+        }
+
+        if (delete == true) {
+            // remove files
+            for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
+                logger.info("Deleting hdfs path " + hdfsPath);
+                Path p = new Path(hdfsPath);
+                if (fs.exists(p) == true) {
+                    fs.delete(p, true);
+                    logger.info("Deleted hdfs path " + hdfsPath);
+                } else {
+                    logger.info("Hdfs path " + hdfsPath + "does not exist");
+                }
+            }
+        } else {
+            System.out.println("--------------- HDFS Path To Be Deleted 
---------------");
+            for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
+                System.out.println(hdfsPath);
+            }
+            
System.out.println("-------------------------------------------------------");
+        }
+
+    }
+
+    private void cleanUnusedIntermediateHiveTable(Configuration conf) throws 
IOException {
+        final KylinConfig config = KylinConfig.getInstanceFromEnv();
+        final CliCommandExecutor cmdExec = config.getCliCommandExecutor();
+        final int uuidLength = 36;
+        final String preFix = "kylin_intermediate_";
+        final String uuidPattern = 
"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
+
+        
+        final String useDatabaseHql = "USE " + 
config.getHiveDatabaseForIntermediateTable() + ";";
+        final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+        hiveCmdBuilder.addStatement(useDatabaseHql);
+        hiveCmdBuilder.addStatement("show tables " + 
"\'kylin_intermediate_*\'" + "; ");
+
+        Pair<Integer, String> result = cmdExec.execute(hiveCmdBuilder.build());
+
+        String outputStr = result.getSecond();
+        BufferedReader reader = new BufferedReader(new 
StringReader(outputStr));
+        String line = null;
+        List<String> allJobs = executableManager.getAllJobIds();
+        List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>();
+        List<String> workingJobList = new ArrayList<String>();
+
+        StringBuilder sb = new StringBuilder();
+        for (String jobId : allJobs) {
+            // only remove FINISHED and DISCARDED job intermediate table
+            final ExecutableState state = 
executableManager.getOutput(jobId).getState();
+            if (!state.isFinalState()) {
+                workingJobList.add(jobId);
+                sb.append(jobId).append("(").append(state).append("), ");
+            }
+        }
+        logger.info("Working jobIDs: " + workingJobList);
+
+        while ((line = reader.readLine()) != null) {
+
+            logger.info("Checking table " + line);
+
+            if (!line.startsWith(preFix))
+                continue;
+
+            if (force == true) {
+                logger.warn("!!!!!!!!!!!!!!!Warning: will delete all 
intermediate hive tables!!!!!!!!!!!!!!!!!!!!!!");
+                allHiveTablesNeedToBeDeleted.add(line);
+                continue;
+            }
+
+            boolean isNeedDel = true;
+
+            if (line.length() > preFix.length() + uuidLength) {
+                String uuid = line.substring(line.length() - uuidLength, 
line.length());
+                uuid = uuid.replace("_", "-");
+                final Pattern UUId_PATTERN = Pattern.compile(uuidPattern);
+                if (UUId_PATTERN.matcher(uuid).matches()) {
+                    //Check whether it's a hive table in use
+                    if (isTableInUse(uuid, workingJobList)) {
+                        logger.info("Skip because not isTableInUse");
+                        isNeedDel = false;
+                    }
+                } else {
+                    logger.info("Skip because not match pattern");
+                    isNeedDel = false;
+                }
+            } else {
+                logger.info("Skip because length not qualified");
+                isNeedDel = false;
+            }
+
+            if (isNeedDel) {
+                allHiveTablesNeedToBeDeleted.add(line);
+            }
+        }
+
+        if (delete == true) {
+            hiveCmdBuilder.reset();
+            hiveCmdBuilder.addStatement(useDatabaseHql);
+            for (String delHive : allHiveTablesNeedToBeDeleted) {
+                hiveCmdBuilder.addStatement("drop table if exists " + delHive 
+ "; ");
+                logger.info("Remove " + delHive + " from hive tables.");
+            }
+
+            try {
+                cmdExec.execute(hiveCmdBuilder.build());
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        } else {
+            System.out.println("------ Intermediate Hive Tables To Be Dropped 
------");
+            for (String hiveTable : allHiveTablesNeedToBeDeleted) {
+                System.out.println(hiveTable);
+            }
+            
System.out.println("----------------------------------------------------");
+        }
+
+        if (reader != null)
+            reader.close();
+    }
+
+    private boolean isTableInUse(String segUuid, List<String> workingJobList) {
+        for (String jobId : workingJobList) {
+            AbstractExecutable abstractExecutable = 
executableManager.getJob(jobId);
+            String segmentId = abstractExecutable.getParam("segmentId");
+
+            if (null == segmentId)
+                continue;
+
+            return segUuid.equals(segmentId);
+        }
+        return false;
+    }
+
+    public static void main(String[] args) throws Exception {
+        StorageCleanupJob cli = new StorageCleanupJob();
+        cli.execute(args);
+    }
+}

Reply via email to