KYLIN-2463, update and refine CubeMigrationCLI Signed-off-by: Hongbin Ma <mahong...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6adb73d3 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6adb73d3 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6adb73d3 Branch: refs/heads/master-hbase0.98 Commit: 6adb73d3a286ca008e09cd1384cb4a183bc115ae Parents: cc1bac9 Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Fri Feb 24 13:26:04 2017 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Fri Feb 24 17:22:54 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/tool/CubeMigrationCLI.java | 78 ++++++++++++-------- .../kylin/tool/CubeMigrationCheckCLI.java | 4 +- 2 files changed, 49 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/6adb73d3/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java index c0042f3..c162a76 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java @@ -59,8 +59,10 @@ import org.apache.kylin.dict.lookup.SnapshotTable; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.realization.IRealizationConstants; @@ -82,13 +84,13 @@ public class CubeMigrationCLI { private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCLI.class); - private static List<Opt> operations; - private static KylinConfig srcConfig; - private static KylinConfig dstConfig; - private static ResourceStore srcStore; - private static ResourceStore dstStore; - private static FileSystem hdfsFS; - private static HBaseAdmin hbaseAdmin; + private List<Opt> operations; + protected KylinConfig srcConfig; + protected KylinConfig dstConfig; + private ResourceStore srcStore; + private ResourceStore dstStore; + private FileSystem hdfsFS; + private HBaseAdmin hbaseAdmin; public static final String ACL_INFO_FAMILY = "i"; private static final String ACL_TABLE_NAME = "_acl"; @@ -96,21 +98,21 @@ public class CubeMigrationCLI { public static void main(String[] args) throws IOException, InterruptedException { + CubeMigrationCLI cli = new CubeMigrationCLI(); if (args.length != 8) { - usage(); + cli.usage(); System.exit(1); } - - moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]); + cli.moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]); } - private static void usage() { + protected void usage() { System.out.println("Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute"); System.out.println(" srcKylinConfigUri: The KylinConfig of the cubeâs source \n" + "dstKylinConfigUri: The KylinConfig of the cubeâs new home \n" + "cubeName: the name of cube to be migrated. \n" + "projectName: The target project in the target environment.(Make sure it exist) \n" + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n" + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n" + "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" + "realExecute: if false, just print the operations to take, if true, do the real migration. \n"); } - public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { + public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { srcConfig = srcCfg; srcStore = ResourceStore.getStore(srcConfig); @@ -160,17 +162,17 @@ public class CubeMigrationCLI { } } - public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { + public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute); } - public static void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException { + public void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException { CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix); checkCLI.execute(cubeName); } - private static String checkAndGetHbaseUrl() { + private String checkAndGetHbaseUrl() { String srcMetadataUrl = srcConfig.getMetadataUrl(); String dstMetadataUrl = dstConfig.getMetadataUrl(); @@ -192,7 +194,7 @@ public class CubeMigrationCLI { return srcHbaseUrl.trim(); } - private static void renameFoldersInHdfs(CubeInstance cube) { + protected void renameFoldersInHdfs(CubeInstance cube) throws IOException { for (CubeSegment segment : cube.getSegments()) { String jobUuid = segment.getLastBuildJobID(); @@ -204,17 +206,19 @@ public class CubeMigrationCLI { } - private static void changeHtableHost(CubeInstance cube) { + protected void changeHtableHost(CubeInstance cube) { + if (cube.getDescriptor().getEngineType() != IStorageAware.ID_SHARDED_HBASE) + return; for (CubeSegment segment : cube.getSegments()) { operations.add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() })); } } - private static void copyACL(CubeInstance cube, String projectName) { + private void copyACL(CubeInstance cube, String projectName) { operations.add(new Opt(OptType.COPY_ACL, new Object[] { cube.getUuid(), cube.getDescriptor().getModel().getUuid(), projectName })); } - private static void copyFilesInMetaStore(CubeInstance cube, String overwriteIfExists) throws IOException { + private void copyFilesInMetaStore(CubeInstance cube, String overwriteIfExists) throws IOException { List<String> metaItems = new ArrayList<String>(); Set<String> dictAndSnapshot = new HashSet<String>(); @@ -232,19 +236,19 @@ public class CubeMigrationCLI { } } - private static void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName) throws IOException { + private void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName) throws IOException { String projectResPath = ProjectInstance.concatResourcePath(projectName); if (!dstStore.exists(projectResPath)) - throw new IllegalStateException("The target project " + projectName + "does not exist"); + throw new IllegalStateException("The target project " + projectName + " does not exist"); operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { srcCube, cubeName, projectName })); } - private static void purgeAndDisable(String cubeName) throws IOException { + private void purgeAndDisable(String cubeName) throws IOException { operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { cubeName })); } - private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, Set<String> dictAndSnapshot) throws IOException { + protected void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, Set<String> dictAndSnapshot) throws IOException { CubeDesc cubeDesc = cube.getDescriptor(); metaResource.add(cube.getResourcePath()); @@ -253,6 +257,7 @@ public class CubeMigrationCLI { for (TableRef tableRef : cubeDesc.getModel().getAllTables()) { metaResource.add(TableDesc.concatResourcePath(tableRef.getTableIdentity())); + metaResource.add(TableExtDesc.concatResourcePath(tableRef.getTableIdentity())); } for (CubeSegment segment : cube.getSegments()) { @@ -262,11 +267,15 @@ public class CubeMigrationCLI { } } - private static enum OptType { + protected enum OptType { COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, COPY_ACL, PURGE_AND_DISABLE } - private static class Opt { + protected void addOpt(OptType type, Object[] params) { + operations.add(new Opt(type, params)); + } + + private class Opt { private OptType type; private Object[] params; @@ -285,17 +294,17 @@ public class CubeMigrationCLI { } - private static void showOpts() { + private void showOpts() { for (int i = 0; i < operations.size(); ++i) { showOpt(operations.get(i)); } } - private static void showOpt(Opt opt) { + private void showOpt(Opt opt) { logger.info("Operation: " + opt.toString()); } - private static void doOpts() throws IOException, InterruptedException { + private void doOpts() throws IOException, InterruptedException { int index = 0; try { for (; index < operations.size(); ++index) { @@ -320,12 +329,13 @@ public class CubeMigrationCLI { } @SuppressWarnings("checkstyle:methodlength") - private static void doOpt(Opt opt) throws IOException, InterruptedException { + private void doOpt(Opt opt) throws IOException, InterruptedException { logger.info("Executing operation: " + opt.toString()); switch (opt.type) { case CHANGE_HTABLE_HOST: { String tableName = (String) opt.params[0]; + System.out.println("CHANGE_HTABLE_HOST, table name: " + tableName); HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); hbaseAdmin.disableTable(tableName); desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix()); @@ -432,6 +442,10 @@ public class CubeMigrationCLI { Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer); + for (TableRef tableRef : srcCube.getModel().getAllTables()) { + project.addTable(tableRef.getTableIdentity()); + } + project.addModel(modelName); project.removeRealization(RealizationType.CUBE, cubeName); project.addRealizationEntry(RealizationType.CUBE, cubeName); @@ -497,7 +511,7 @@ public class CubeMigrationCLI { } } - private static void undo(Opt opt) throws IOException, InterruptedException { + private void undo(Opt opt) throws IOException, InterruptedException { logger.info("Undo operation: " + opt.toString()); switch (opt.type) { @@ -559,7 +573,7 @@ public class CubeMigrationCLI { } } - private static void updateMeta(KylinConfig config) { + private void updateMeta(KylinConfig config) { String[] nodes = config.getRestServers(); for (String node : nodes) { RestClient restClient = new RestClient(node); @@ -572,7 +586,7 @@ public class CubeMigrationCLI { } } - private static void renameHDFSPath(String srcPath, String dstPath) throws IOException, InterruptedException { + private void renameHDFSPath(String srcPath, String dstPath) throws IOException, InterruptedException { int nRetry = 0; int sleepTime = 5000; while (!hdfsFS.rename(new Path(srcPath), new Path(dstPath))) { http://git-wip-us.apache.org/repos/asf/kylin/blob/6adb73d3/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java index fe348ba..54fbbc0 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java @@ -35,6 +35,7 @@ import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.metadata.realization.IRealizationConstants; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; @@ -139,7 +140,8 @@ public class CubeMigrationCheckCLI { List<String> segFullNameList = Lists.newArrayList(); CubeInstance cube = CubeManager.getInstance(dstCfg).getCube(cubeName); - addHTableNamesForCube(cube, segFullNameList); + if (cube.getDescriptor().getStorageType() == IStorageAware.ID_SHARDED_HBASE) + addHTableNamesForCube(cube, segFullNameList); check(segFullNameList); }