This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 2e2a4dff9ca9c08351084f53d293e189083db140 Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Tue May 19 11:14:42 2020 +0800 KYLIN-4489 Create a tool for migration cross clusters --- .../java/org/apache/kylin/common/KylinConfig.java | 20 + .../apache/kylin/common/restclient/RestClient.java | 40 +- .../kylin/metadata/model/DataModelManager.java | 3 + .../kylin/metadata/project/ProjectManager.java | 6 +- .../kylin/rest/controller/MigrationController.java | 2 +- pom.xml | 5 + .../kylin/rest/controller/AdminController.java | 22 + .../apache/kylin/rest/service/AdminService.java | 14 + .../apache/kylin/rest/service/HBaseInfoUtil.java | 17 +- .../apache/kylin/rest/service/ModelService.java | 8 +- .../kylin/storage/hbase/HBaseResourceStore.java | 7 +- .../storage/hbase/util/DeployCoprocessorCLI.java | 21 +- tool/pom.xml | 11 + .../apache/kylin/tool/migration/ClusterUtil.java | 165 +++++ .../migration/CubeMigrationCrossClusterCLI.java | 757 +++++++++++++++++++++ .../kylin/tool/migration/DstClusterUtil.java | 371 ++++++++++ .../kylin/tool/migration/SrcClusterUtil.java | 148 ++++ 17 files changed, 1595 insertions(+), 22 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index 7b0888b..c9001f0 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -20,6 +20,7 @@ package org.apache.kylin.common; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.kylin.common.restclient.RestClient; import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.ClassUtil; @@ -571,6 +572,25 @@ public class KylinConfig extends KylinConfigBase { reloadKylinConfig(buildSiteProperties()); } + public static String getConfigAsString(Configuration conf) { + final StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, String> entry : conf) { + sb.append(entry.getKey() + "=" + entry.getValue()).append('\n'); + } + return sb.toString(); + } + + public static Configuration getConfigFromString(String configInStr) throws IOException { + Properties props = new Properties(); + props.load(new StringReader(configInStr)); + + Configuration config = new Configuration(); + for (Map.Entry<Object, Object> entry : props.entrySet()) { + config.set((String) entry.getKey(), (String) entry.getValue()); + } + return config; + } + public KylinConfig base() { return this; } diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java index d908f58..21b08f8 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java @@ -30,7 +30,6 @@ import java.util.regex.Pattern; import javax.xml.bind.DatatypeConverter; -import com.google.common.base.Strings; import org.apache.http.HttpResponse; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; @@ -55,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; /** */ @@ -175,6 +175,25 @@ public class RestClient { } } + public void announceWipeCache(String entity, String event, String cacheKey) throws IOException { + String url = baseUrl + "/cache/announce/" + entity + "/" + cacheKey + "/" + event; + HttpPut request = new HttpPut(url); + + try { + HttpResponse response = client.execute(request); + + if (response.getStatusLine().getStatusCode() != 200) { + String msg = EntityUtils.toString(response.getEntity()); + throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + + " with announce cache wipe url " + url + "\n" + msg); + } + } catch (Exception ex) { + throw new IOException(ex); + } finally { + request.releaseConnection(); + } + } + public void wipeCache(String entity, String event, String cacheKey) throws IOException { HttpPut request; String url; @@ -202,8 +221,19 @@ public class RestClient { } public String getKylinProperties() throws IOException { - String url = baseUrl + "/admin/config"; - HttpGet request = new HttpGet(url); + return getConfiguration(baseUrl + "/admin/config", false); + } + + public String getHDFSConfiguration() throws IOException { + return getConfiguration(baseUrl + "/admin/config/hdfs", true); + } + + public String getHBaseConfiguration() throws IOException { + return getConfiguration(baseUrl + "/admin/config/hbase", true); + } + + private String getConfiguration(String url, boolean ifAuth) throws IOException { + HttpGet request = ifAuth ? newGet(url) : new HttpGet(url); HttpResponse response = null; try { response = client.execute(request); @@ -372,7 +402,7 @@ public class RestClient { String msg = getContent(response); Map<String, String> kvMap = JsonUtil.readValueAsMap(msg); String exception = kvMap.containsKey("exception") ? kvMap.get("exception") : "unknown"; - throw new IOException(exception); + throw new IOException("Error code: " + response.getStatusLine().getStatusCode() + "\n" + exception); } } finally { post.releaseConnection(); @@ -411,7 +441,7 @@ public class RestClient { } private HttpGet newGet(String url) { - HttpGet get = new HttpGet(); + HttpGet get = new HttpGet(url); addHttpHeaders(get); return get; } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java index 47e2c3d..f483a2c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.persistence.WriteConflictException; @@ -48,6 +49,8 @@ import org.apache.kylin.shaded.com.google.common.collect.Lists; public class DataModelManager { private static final Logger logger = LoggerFactory.getLogger(DataModelManager.class); + public static final Serializer<DataModelDesc> MODELDESC_SERIALIZER = new JsonSerializer<DataModelDesc>( + DataModelDesc.class); public static DataModelManager getInstance(KylinConfig config) { return config.getManager(DataModelManager.class); diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java index 15b6a2d..ebcd45c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java @@ -30,7 +30,9 @@ import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.AutoReadWriteLock; import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock; import org.apache.kylin.metadata.TableMetadataManager; @@ -54,7 +56,9 @@ import org.apache.kylin.shaded.com.google.common.collect.Sets; public class ProjectManager { private static final Logger logger = LoggerFactory.getLogger(ProjectManager.class); - + public static final Serializer<ProjectInstance> PROJECT_SERIALIZER = new JsonSerializer<ProjectInstance>( + ProjectInstance.class); + public static ProjectManager getInstance(KylinConfig config) { return config.getManager(ProjectManager.class); } diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java index efef5cf..45f83ea 100644 --- a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java +++ b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java @@ -155,7 +155,7 @@ public class MigrationController extends BasicController { } DataModelDesc dataModelDesc = JsonUtil.readValue(request.getModelDescData(), DataModelDesc.class); logger.info("Schema compatibility check for model {}", dataModelDesc.getName()); - modelService.checkModelCompatibility(request.getProjectName(), dataModelDesc, tableDescList); + modelService.checkModelCompatibility(dataModelDesc, tableDescList); logger.info("Pass schema compatibility check for model {}", dataModelDesc.getName()); } catch (Exception e) { logger.error(e.getMessage(), e); diff --git a/pom.xml b/pom.xml index c73bf28..f60b7c8 100644 --- a/pom.xml +++ b/pom.xml @@ -542,6 +542,11 @@ <optional>true</optional> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-distcp</artifactId> + <version>${hadoop2.version}</version> + </dependency> + <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java index 4d90db8..780e069 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java @@ -112,6 +112,28 @@ public class AdminController extends BasicController { return configRes; } + @RequestMapping(value = "/config/hdfs", method = { RequestMethod.GET }, produces = { "application/json" }) + @ResponseBody + public GeneralResponse getHDFSConfig() throws IOException { + String config = adminService.getHadoopConfigAsString(); + + GeneralResponse configRes = new GeneralResponse(); + configRes.put("config", config); + + return configRes; + } + + @RequestMapping(value = "/config/hbase", method = { RequestMethod.GET }, produces = { "application/json" }) + @ResponseBody + public GeneralResponse getHBaseConfig() throws IOException { + String config = adminService.getHBaseConfigAsString(); + + GeneralResponse configRes = new GeneralResponse(); + configRes.put("config", config); + + return configRes; + } + @RequestMapping(value = "/metrics/cubes", method = { RequestMethod.GET }, produces = { "application/json" }) @ResponseBody public MetricsResponse cubeMetrics(MetricsRequest request) { diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java b/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java index c133a28..4e9cd03 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java @@ -31,10 +31,12 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.OrderedProperties; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.job.StorageCleanupJob; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PreAuthorize; @@ -117,4 +119,16 @@ public class AdminService extends BasicService { return KylinConfig.getInstanceFromEnv().exportToString(propertyKeys); } + + public String getHadoopConfigAsString() throws IOException { + logger.debug("Get Kylin Hadoop Config"); + + return KylinConfig.getConfigAsString(HadoopUtil.getCurrentConfiguration()); + } + + public String getHBaseConfigAsString() throws IOException { + logger.debug("Get Kylin HBase Config"); + + return KylinConfig.getConfigAsString(HBaseConnection.getCurrentHBaseConfiguration()); + } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/HBaseInfoUtil.java b/server-base/src/main/java/org/apache/kylin/rest/service/HBaseInfoUtil.java index 8fc1de9..012f14a 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/HBaseInfoUtil.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/HBaseInfoUtil.java @@ -20,6 +20,7 @@ package org.apache.kylin.rest.service; import java.io.IOException; import java.util.Map; +import java.util.Objects; import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; @@ -28,13 +29,17 @@ import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; import org.apache.kylin.storage.hbase.util.HBaseUnionUtil; public class HBaseInfoUtil { - + @SuppressWarnings("unused") // used by reflection public static HBaseResponse getHBaseInfo(String tableName, KylinConfig config) throws IOException { if (!config.getStorageUrl().getScheme().equals("hbase")) return null; - + Connection conn = HBaseUnionUtil.getConnection(config, tableName); + return getHBaseInfo(tableName, conn); + } + + public static HBaseResponse getHBaseInfo(String tableName, Connection conn) throws IOException { HBaseResponse hr = null; long tableSize = 0; int regionCount = 0; @@ -54,4 +59,12 @@ public class HBaseInfoUtil { hr.setRegionCount(regionCount); return hr; } + + public static boolean checkEquals(HBaseResponse hbaseR1, HBaseResponse hbaseR2) { + if (hbaseR1 == hbaseR2) + return true; + return Objects.equals(hbaseR1.getTableName(), hbaseR2.getTableName()) + && hbaseR1.getTableSize() == hbaseR2.getTableSize() + && hbaseR1.getRegionCount() == hbaseR2.getRegionCount(); + } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java index 5a3a1ca..7ff3919 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java @@ -168,11 +168,7 @@ public class ModelService extends BasicService { result.raiseExceptionWhenInvalid(); } - public void checkModelCompatibility(String project, DataModelDesc dataModalDesc, List<TableDesc> tableDescList) { - ProjectInstance prjInstance = getProjectManager().getProject(project); - if (prjInstance == null) { - throw new BadRequestException("Project " + project + " does not exist"); - } + public void checkModelCompatibility(DataModelDesc dataModalDesc, List<TableDesc> tableDescList) { ModelSchemaUpdateChecker checker = new ModelSchemaUpdateChecker(getTableManager(), getCubeManager(), getDataModelManager()); @@ -181,7 +177,7 @@ public class ModelService extends BasicService { tableDescMap.put(tableDesc.getIdentity(), tableDesc); } dataModalDesc.init(getConfig(), tableDescMap); - ModelSchemaUpdateChecker.CheckResult result = checker.allowEdit(dataModalDesc, project, false); + ModelSchemaUpdateChecker.CheckResult result = checker.allowEdit(dataModalDesc, null, false); result.raiseExceptionWhenInvalid(); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index dd63b52..e26cd74 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.UUID; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HTableDescriptor; @@ -91,10 +92,14 @@ public class HBaseResourceStore extends PushdownResourceStore { .parseInt(getConnection().getConfiguration().get("hbase.client.keyvalue.maxsize", "10485760")); } - Connection getConnection() throws IOException { + protected Connection getConnection() throws IOException { return HBaseConnection.get(metadataUrl); } + protected Configuration getCurrentHBaseConfiguration() { + return HBaseConnection.getCurrentHBaseConfiguration(); + } + private StorageURL buildMetadataUrl(KylinConfig kylinConfig) throws IOException { StorageURL url = kylinConfig.getMetadataUrl(); if (!url.getScheme().equals("hbase")) diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index 845a4e5..52d1c14 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -420,7 +420,7 @@ public class DeployCoprocessorCLI { } public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException { - Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config); + Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, getHDFSWorkingDirectory(config)); FileStatus newestJar = null; for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) { if (fileStatus.getPath().toString().endsWith(".jar")) { @@ -440,8 +440,14 @@ public class DeployCoprocessorCLI { return path; } - public static synchronized Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, - Set<String> oldJarPaths) throws IOException { + public synchronized static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, + Set<String> oldJarPaths) throws IOException { + String hdfsWorkingDirectory = getHDFSWorkingDirectory(KylinConfig.getInstanceFromEnv()); + return uploadCoprocessorJar(localCoprocessorJar, fileSystem, hdfsWorkingDirectory, oldJarPaths); + } + + public synchronized static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, + String hdfsWorkingDirectory, Set<String> oldJarPaths) throws IOException { Path uploadPath = null; File localCoprocessorFile = new File(localCoprocessorJar); @@ -449,7 +455,7 @@ public class DeployCoprocessorCLI { if (oldJarPaths == null) { oldJarPaths = new HashSet<String>(); } - Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv()); + Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, hdfsWorkingDirectory); for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) { if (isSame(localCoprocessorFile, fileStatus)) { uploadPath = fileStatus.getPath(); @@ -511,9 +517,12 @@ public class DeployCoprocessorCLI { return baseName; } - private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException { + private static String getHDFSWorkingDirectory(KylinConfig config) { String hdfsWorkingDirectory = config.getHdfsWorkingDirectory(); - hdfsWorkingDirectory = HBaseConnection.makeQualifiedPathInHBaseCluster(hdfsWorkingDirectory); + return HBaseConnection.makeQualifiedPathInHBaseCluster(hdfsWorkingDirectory); + } + + private static Path getCoprocessorHDFSDir(FileSystem fileSystem, String hdfsWorkingDirectory) throws IOException { Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor"); fileSystem.mkdirs(coprocessorDir); return coprocessorDir; diff --git a/tool/pom.xml b/tool/pom.xml index 140ff93..8d4c2b4 100644 --- a/tool/pom.xml +++ b/tool/pom.xml @@ -76,6 +76,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-api</artifactId> <scope>provided</scope> @@ -86,6 +91,12 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-distcp</artifactId> + <scope>provided</scope> + </dependency> + <!--Spring--> <dependency> <groupId>org.springframework</groupId> diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/ClusterUtil.java b/tool/src/main/java/org/apache/kylin/tool/migration/ClusterUtil.java new file mode 100644 index 0000000..14f8e90 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/migration/ClusterUtil.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.migration; + +import java.io.IOException; +import java.net.URI; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.RawResource; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.restclient.RestClient; +import org.apache.kylin.dict.DictionaryInfo; +import org.apache.kylin.dict.lookup.SnapshotTable; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.rest.security.ManagedUser; +import org.apache.kylin.rest.service.KylinUserService; +import org.apache.kylin.storage.hbase.HBaseResourceStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; +import com.google.common.collect.Sets; + +public abstract class ClusterUtil { + private static final Logger logger = LoggerFactory.getLogger(ClusterUtil.class); + + protected final KylinConfig kylinConfig; + protected final RestClient restClient; + protected final String hdfsWorkingDirectory; + + protected final Configuration hbaseConf; + protected final Connection hbaseConn; + protected final ResourceStore resourceStore; + protected final Admin hbaseAdmin; + + final Configuration jobConf; + final FileSystem jobFS; + final String jobHdfsWorkingDirectoryQualified; + final FileSystem hbaseFS; + final String hbaseHdfsWorkingDirectoryQualified; + + public ClusterUtil(String configURI, boolean ifJobFSHAEnabled, boolean ifHBaseFSHAEnabled) throws IOException { + this.kylinConfig = KylinConfig.createInstanceFromUri(configURI); + this.restClient = new RestClient(configURI); + Path hdfsWorkingPath = Path.getPathWithoutSchemeAndAuthority(new Path(kylinConfig.getHdfsWorkingDirectory())); + String tmpHdfsWorkingDirectory = hdfsWorkingPath.toString(); + this.hdfsWorkingDirectory = tmpHdfsWorkingDirectory.endsWith("/") ? tmpHdfsWorkingDirectory + : tmpHdfsWorkingDirectory + "/"; + + this.jobConf = KylinConfig.getConfigFromString(restClient.getHDFSConfiguration()); + this.jobFS = FileSystem.get(jobConf); + this.jobHdfsWorkingDirectoryQualified = getQualifiedPath(jobConf, hdfsWorkingDirectory, ifJobFSHAEnabled); + + this.hbaseConf = KylinConfig.getConfigFromString(restClient.getHBaseConfiguration()); + this.hbaseFS = FileSystem.get(hbaseConf); + this.hbaseHdfsWorkingDirectoryQualified = getQualifiedPath(hbaseConf, hdfsWorkingDirectory, ifHBaseFSHAEnabled); + + this.hbaseConn = ConnectionFactory.createConnection(hbaseConf); + this.resourceStore = new HBaseResourceStore(kylinConfig) { + @Override + protected Connection getConnection() { + return hbaseConn; + } + + @Override + protected Configuration getCurrentHBaseConfiguration() { + return hbaseConf; + } + }; + this.hbaseAdmin = hbaseConn.getAdmin(); + } + + public abstract ProjectInstance getProject(String projName) throws IOException; + + public abstract DictionaryInfo getDictionaryInfo(String dictPath) throws IOException; + + public abstract SnapshotTable getSnapshotTable(String snapshotPath) throws IOException; + + public abstract String getRootDirQualifiedOfHTable(String tableName); + + public ManagedUser getUserDetails(String userKey) throws IOException { + return resourceStore.getResource(userKey, KylinUserService.SERIALIZER); + } + + public final RawResource getResource(String resPath) throws IOException { + return resourceStore.getResource(resPath); + } + + public String getJobWorkingDirQualified(String jobId) { + return JobBuilderSupport.getJobWorkingDir(jobHdfsWorkingDirectoryQualified, jobId); + } + + private static String getQualifiedPath(Configuration conf, String path, boolean ifHAEnabled) throws IOException { + String hdfsSchema = getReplacedDefaultFS(conf, !ifHAEnabled); + return hdfsSchema + path; + } + + private static String getReplacedDefaultFS(Configuration conf, boolean ifNeedReplace) throws IOException { + String defaultFS = conf.get(FileSystem.FS_DEFAULT_NAME_KEY); + if (!ifNeedReplace) { + return defaultFS; + } + + String nameServices = conf.get("dfs.nameservices"); + if (Strings.isNullOrEmpty(nameServices)) { + return defaultFS; + } + + // check whether name service is defined for the default fs + Set<String> nameServiceSet = Sets.newHashSet(nameServices.split(",")); + String defaultNameService = URI.create(defaultFS).getHost(); + if (!nameServiceSet.contains(defaultNameService)) { + logger.info("name service {} is not defined among {}", defaultNameService, nameServices); + return defaultFS; + } + + // select one usable node as the default fs + String haHostNames = conf.get("dfs.ha.namenodes." + defaultNameService); + if (!Strings.isNullOrEmpty(haHostNames)) { + conf = new Configuration(conf); + for (String oneNodeAlias : haHostNames.split(",")) { + String rpcNode = conf.get("dfs.namenode.rpc-address." + defaultNameService + "." + oneNodeAlias); + String replaced = "hdfs://" + rpcNode; + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, replaced); + + Path rootPath = new Path(replaced + "/"); + FileSystem fs = FileSystem.get(conf); + try { + fs.getStatus(rootPath); + } catch (Exception e) { + logger.warn("cannot use {} as default fs due to ", replaced, e); + continue; + } + logger.info("replaced the default fs {} by {}", defaultFS, replaced); + return replaced; + } + throw new IllegalArgumentException("fail to replace the default fs " + defaultFS); + } + throw new IllegalArgumentException("dfs.ha.namenodes." + defaultNameService + " is not set"); + } +} diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java b/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java new file mode 100644 index 0000000..95efab0 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java @@ -0,0 +1,757 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.migration; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.tools.DistCp; +import org.apache.hadoop.tools.DistCpConstants; +import org.apache.hadoop.tools.DistCpOptions; +import org.apache.hadoop.tools.OptionsParser; +import org.apache.kylin.common.persistence.RawResource; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.DictionaryDesc; +import org.apache.kylin.cube.model.SnapshotTableDesc; +import org.apache.kylin.dict.DictionaryInfo; +import org.apache.kylin.dict.GlobalDictionaryBuilder; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; +import org.apache.kylin.dict.lookup.SnapshotTable; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableRef; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.RealizationEntry; +import org.apache.kylin.metadata.realization.IRealization; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.metadata.realization.RealizationType; +import org.apache.kylin.rest.response.HBaseResponse; +import org.apache.kylin.rest.security.ManagedUser; +import org.apache.kylin.rest.service.HBaseInfoUtil; +import org.apache.kylin.rest.service.KylinUserService; +import org.apache.kylin.rest.service.update.TableSchemaUpdateMapping; +import org.apache.kylin.rest.service.update.TableSchemaUpdater; +import org.apache.kylin.storage.hybrid.HybridInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class CubeMigrationCrossClusterCLI extends AbstractApplication { + + private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCrossClusterCLI.class); + + @SuppressWarnings("static-access") + public static final Option OPTION_KYLIN_URI_SRC = OptionBuilder.withArgName("kylinUriSrc").hasArg().isRequired(true) + .withDescription("Specify the source kylin uri with format user:pwd@host:port").create("kylinUriSrc"); + @SuppressWarnings("static-access") + public static final Option OPTION_KYLIN_URI_DST = OptionBuilder.withArgName("kylinUriDst").hasArg().isRequired(true) + .withDescription("Specify the destination kylin uri with format user:pwd@host:port").create("kylinUriDst"); + + @SuppressWarnings("static-access") + public static final Option OPTION_UPDATE_MAPPING = OptionBuilder.withArgName("updateMappingPath").hasArg() + .isRequired(false).withDescription("Specify the path for the update mapping file") + .create("updateMappingPath"); + + @SuppressWarnings("static-access") + public static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false) + .withDescription("Specify which cube to extract").create("cube"); + @SuppressWarnings("static-access") + public static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false) + .withDescription("Specify which hybrid to extract").create("hybrid"); + @SuppressWarnings("static-access") + public static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false) + .withDescription("Specify realizations in which project to extract").create("project"); + @SuppressWarnings("static-access") + public static final Option OPTION_All = OptionBuilder.withArgName("all").hasArg(false).isRequired(false) + .withDescription("Specify realizations in all projects to extract").create("all"); + + @SuppressWarnings("static-access") + public static final Option OPTION_DST_HIVE_CHECK = OptionBuilder.withArgName("dstHiveCheck").hasArg() + .isRequired(false).withDescription("Specify whether to check destination hive tables") + .create("dstHiveCheck"); + + @SuppressWarnings("static-access") + public static final Option OPTION_OVERWRITE = OptionBuilder.withArgName("overwrite").hasArg().isRequired(false) + .withDescription("Specify whether to overwrite existing cubes").create("overwrite"); + + @SuppressWarnings("static-access") + public static final Option OPTION_SCHEMA_ONLY = OptionBuilder.withArgName("schemaOnly").hasArg().isRequired(false) + .withDescription("Specify whether only migrate cube related schema").create("schemaOnly"); + + @SuppressWarnings("static-access") + public static final Option OPTION_EXECUTE = OptionBuilder.withArgName("execute").hasArg().isRequired(false) + .withDescription("Specify whether it's to execute the migration").create("execute"); + + @SuppressWarnings("static-access") + public static final Option OPTION_COPROCESSOR_PATH = OptionBuilder.withArgName("coprocessorPath").hasArg() + .isRequired(false).withDescription("Specify the path of coprocessor to be deployed") + .create("coprocessorPath"); + + @SuppressWarnings("static-access") + public static final Option OPTION_FS_HA_ENABLED_CODE = OptionBuilder.withArgName("codeOfFSHAEnabled").hasArg() + .isRequired(false).withDescription("Specify whether to enable the namenode ha of clusters") + .create("codeOfFSHAEnabled"); + + @SuppressWarnings("static-access") + public static final Option OPTION_DISTCP_JOB_QUEUE = OptionBuilder.withArgName("distCpJobQueue").hasArg() + .isRequired(false).withDescription("Specify the mapreduce.job.queuename for DistCp job ") + .create("distCpJobQueue"); + + @SuppressWarnings("static-access") + public static final Option OPTION_DISTCP_JOB_MEMORY = OptionBuilder.withArgName("distCpJobMemory").hasArg() + .isRequired(false).withDescription("Specify the mapreduce.map.memory.mb for DistCp job ") + .create("distCpJobMemory"); + + @SuppressWarnings("static-access") + public static final Option OPTION_THREAD_NUM = OptionBuilder.withArgName("nThread").hasArg().isRequired(false) + .withDescription("Specify the number of threads for migrating cube data in parallel ").create("nThread"); + + protected final Options options; + + private Configuration distCpConf; + + protected SrcClusterUtil srcCluster; + protected DstClusterUtil dstCluster; + + private int codeOfFSHAEnabled = 3; + protected int nThread; + + private boolean ifDstHiveCheck = true; + private boolean ifSchemaOnly = true; + private boolean ifExecute = false; + private boolean ifOverwrite = false; + + private String coprocessorJarPath; + + private Set<CubeInstance> cubes = Sets.newHashSet(); + private Set<HybridInstance> hybrids = Sets.newHashSet(); + private Set<ProjectInstance> projects = Sets.newHashSet(); + + private Map<String, TableSchemaUpdateMapping> mappings = Maps.newHashMap(); + + private Map<String, ProjectInstance> dstProjects = Maps.newHashMap(); + + public CubeMigrationCrossClusterCLI() { + OptionGroup realizationOrProject = new OptionGroup(); + realizationOrProject.addOption(OPTION_CUBE); + realizationOrProject.addOption(OPTION_HYBRID); + realizationOrProject.addOption(OPTION_PROJECT); + realizationOrProject.addOption(OPTION_All); + realizationOrProject.setRequired(true); + + options = new Options(); + options.addOption(OPTION_KYLIN_URI_SRC); + options.addOption(OPTION_KYLIN_URI_DST); + options.addOption(OPTION_FS_HA_ENABLED_CODE); + options.addOption(OPTION_UPDATE_MAPPING); + options.addOptionGroup(realizationOrProject); + options.addOption(OPTION_DST_HIVE_CHECK); + options.addOption(OPTION_SCHEMA_ONLY); + options.addOption(OPTION_OVERWRITE); + options.addOption(OPTION_EXECUTE); + options.addOption(OPTION_COPROCESSOR_PATH); + options.addOption(OPTION_DISTCP_JOB_QUEUE); + options.addOption(OPTION_THREAD_NUM); + options.addOption(OPTION_DISTCP_JOB_MEMORY); + } + + protected Options getOptions() { + return options; + } + + public static boolean ifFSHAEnabled(int code, int pos) { + int which = 1 << pos; + return (code & which) == which; + } + + protected void init(OptionsHelper optionsHelper) throws Exception { + if (optionsHelper.hasOption(OPTION_UPDATE_MAPPING)) { + File mappingFile = new File(optionsHelper.getOptionValue(OPTION_UPDATE_MAPPING)); + String content = new String(Files.readAllBytes(mappingFile.toPath()), Charset.defaultCharset()); + Map<String, TableSchemaUpdateMapping> tmpMappings = JsonUtil.readValue(content, + new TypeReference<Map<String, TableSchemaUpdateMapping>>() { + }); + mappings = Maps.newHashMapWithExpectedSize(tmpMappings.size()); + for (Map.Entry<String, TableSchemaUpdateMapping> entry : tmpMappings.entrySet()) { + mappings.put(entry.getKey().toUpperCase(Locale.ROOT), entry.getValue()); + } + } + + ifDstHiveCheck = optionsHelper.hasOption(OPTION_DST_HIVE_CHECK) + ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_DST_HIVE_CHECK)) + : true; + ifSchemaOnly = optionsHelper.hasOption(OPTION_SCHEMA_ONLY) + ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_SCHEMA_ONLY)) + : true; + ifOverwrite = optionsHelper.hasOption(OPTION_OVERWRITE) + ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_OVERWRITE)) + : false; + ifExecute = optionsHelper.hasOption(OPTION_EXECUTE) + ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_EXECUTE)) + : false; + + codeOfFSHAEnabled = optionsHelper.hasOption(OPTION_FS_HA_ENABLED_CODE) + ? Integer.valueOf(optionsHelper.getOptionValue(OPTION_FS_HA_ENABLED_CODE)) + : 3; + + String srcConfigURI = optionsHelper.getOptionValue(OPTION_KYLIN_URI_SRC); + srcCluster = new SrcClusterUtil(srcConfigURI, ifFSHAEnabled(codeOfFSHAEnabled, 0), + ifFSHAEnabled(codeOfFSHAEnabled, 1)); + String dstConfigURI = optionsHelper.getOptionValue(OPTION_KYLIN_URI_DST); + dstCluster = new DstClusterUtil(dstConfigURI, ifFSHAEnabled(codeOfFSHAEnabled, 2), + ifFSHAEnabled(codeOfFSHAEnabled, 3), ifExecute); + + distCpConf = new Configuration(srcCluster.jobConf); + if (optionsHelper.hasOption(OPTION_DISTCP_JOB_QUEUE)) { + distCpConf.set("mapreduce.job.queuename", optionsHelper.getOptionValue(OPTION_DISTCP_JOB_QUEUE)); + } + int distCpMemory = optionsHelper.hasOption(OPTION_DISTCP_JOB_MEMORY) + ? Integer.valueOf(optionsHelper.getOptionValue(OPTION_DISTCP_JOB_MEMORY)) + : 1500; + int distCpJVMMemory = distCpMemory * 4 / 5; + distCpConf.set("mapreduce.map.memory.mb", "" + distCpMemory); + distCpConf.set("mapreduce.map.java.opts", + "-server -Xmx" + distCpJVMMemory + "m -Djava.net.preferIPv4Stack=true"); + + nThread = optionsHelper.hasOption(OPTION_THREAD_NUM) + ? Integer.valueOf(optionsHelper.getOptionValue(OPTION_THREAD_NUM)) + : 8; + + coprocessorJarPath = optionsHelper.hasOption(OPTION_COPROCESSOR_PATH) + ? optionsHelper.getOptionValue(OPTION_COPROCESSOR_PATH) + : srcCluster.getDefaultCoprocessorJarPath(); + } + + protected void execute(OptionsHelper optionsHelper) throws Exception { + init(optionsHelper); + + if (optionsHelper.hasOption(OPTION_All)) { + projects.addAll(srcCluster.listAllProjects()); + } else if (optionsHelper.hasOption(OPTION_PROJECT)) { + Set<String> projectNames = Sets.newHashSet(optionsHelper.getOptionValue(OPTION_PROJECT).split(",")); + for (String projectName : projectNames) { + ProjectInstance project = srcCluster.getProject(projectName); + if (project == null) { + throw new IllegalArgumentException("No project found with name of " + projectName); + } + projects.add(project); + } + } else if (optionsHelper.hasOption(OPTION_CUBE)) { + String cubeNames = optionsHelper.getOptionValue(OPTION_CUBE); + for (String cubeName : cubeNames.split(",")) { + CubeInstance cube = srcCluster.getCube(cubeName); + if (cube == null) { + throw new IllegalArgumentException("No cube found with name of " + cubeName); + } else { + cubes.add(cube); + } + } + } else if (optionsHelper.hasOption(OPTION_HYBRID)) { + String hybridNames = optionsHelper.getOptionValue(OPTION_HYBRID); + for (String hybridName : hybridNames.split(",")) { + HybridInstance hybridInstance = srcCluster.getHybrid(hybridName); + if (hybridInstance != null) { + hybrids.add(hybridInstance); + } else { + throw new IllegalArgumentException("No hybrid found with name of" + hybridName); + } + } + } + + if (!projects.isEmpty()) { + for (ProjectInstance project : projects) { + for (RealizationEntry entry : project.getRealizationEntries()) { + IRealization realization = srcCluster.getRealization(entry); + addRealization(realization); + } + } + } + if (!hybrids.isEmpty()) { + for (HybridInstance hybrid : hybrids) { + addHybrid(hybrid); + } + } + + Map<CubeInstance, Exception> failedCubes = Maps.newHashMap(); + + for (CubeInstance cube : cubes) { + logger.info("start to migrate cube {}", cube); + try { + migrateCube(cube); + logger.info("finish migrating cube {}", cube); + } catch (Exception e) { + logger.error("fail to migrate cube {} due to ", cube, e); + failedCubes.put(cube, e); + } + } + + for (HybridInstance hybrid : hybrids) { + dstCluster.saveHybrid(hybrid); + + // update project + ProjectInstance srcProject = srcCluster.getProjectByRealization(RealizationType.HYBRID, hybrid.getName()); + ProjectInstance dstProject = getDstProject(srcProject); + + // update hybrids + Set<RealizationEntry> projReals = Sets.newHashSet(dstProject.getRealizationEntries()); + projReals.add(RealizationEntry.create(RealizationType.HYBRID, hybrid.getName())); + dstProject.setRealizationEntries(Lists.newArrayList(projReals)); + + dstProjects.put(dstProject.getName(), dstProject); + } + + for (String projName : dstProjects.keySet()) { + dstCluster.saveProject(dstProjects.get(projName)); + } + + dstCluster.updateMeta(); + + if (failedCubes.isEmpty()) { + logger.info("Migration for cubes {}, hyrbids {} all succeed", cubes, hybrids); + } else { + logger.warn("Failed to migrate cubes {} and need to check the detailed reason and retry again!!!", + failedCubes.keySet()); + } + } + + private void migrateCube(CubeInstance cube) throws IOException { + if (!ifOverwrite && dstCluster.exists(CubeInstance.concatResourcePath(cube.getName()))) { + throw new RuntimeException(("The cube named " + cube.getName() + + " already exists on target metadata store. Please delete it firstly and try again")); + } + + ProjectInstance srcProject = srcCluster.getProjectByRealization(RealizationType.CUBE, cube.getName()); + + String descName = cube.getDescName(); + CubeDesc cubeDesc = srcCluster.getCubeDesc(descName); + + String modelName = cubeDesc.getModelName(); + DataModelDesc modelDesc = srcCluster.getDataModelDesc(modelName); + + Set<TableDesc> tableSet = Sets.newHashSet(); + for (TableRef tableRef : modelDesc.getAllTables()) { + TableDesc tableDescOld = srcCluster.getTableDesc(tableRef.getTableIdentity(), srcProject.getName()); + TableDesc tableDescUpdated = TableSchemaUpdater.dealWithMappingForTable(tableDescOld, mappings); + tableSet.add(tableDescUpdated); + } + + modelDesc = TableSchemaUpdater.dealWithMappingForModel(modelDesc, mappings); + + cubeDesc = TableSchemaUpdater.dealWithMappingForCubeDesc(cubeDesc, mappings); + + { // compatibility check before migrating to the destination cluster + dstCluster.checkCompatibility(srcProject.getName(), tableSet, modelDesc, ifDstHiveCheck); + } + + { + for (TableDesc table : tableSet) { + dstCluster.saveTableDesc(table); + } + + dstCluster.saveModelDesc(modelDesc); + + dstCluster.saveCubeDesc(cubeDesc); + + if (ifSchemaOnly) { + cube = CubeInstance.getCopyOf(cube); + cube.getSegments().clear(); + cube.resetSnapshots(); + cube.setStatus(RealizationStatusEnum.DISABLED); + cube.clearCuboids(); + } else { + // cube with global dictionary cannot be migrated with data + checkGlobalDict(cubeDesc); + + // delete those NEW segments and only keep the READY segments + cube.setSegments(cube.getSegments(SegmentStatusEnum.READY)); + + cube = TableSchemaUpdater.dealWithMappingForCube(cube, mappings); + + ExecutorService executor = Executors.newFixedThreadPool(nThread, new ThreadFactoryBuilder() + .setNameFormat("Cube-" + cube.getName() + "-data-migration-pool-%d").build()); + try { + List<Future<?>> futureList = migrateCubeData(cube, cubeDesc, executor); + executor.shutdown(); + for (Future<?> future : futureList) { + try { + future.get(); + } catch (InterruptedException e) { + logger.warn(e.getMessage()); + } catch (ExecutionException e) { + executor.shutdownNow(); + logger.error(e.getMessage()); + throw new RuntimeException(e); + } + } + } finally { + // in case that exceptions are thrown when call migrateCubeData() + if (!executor.isShutdown()) { + logger.warn("shut down executor for cube {}", cube); + executor.shutdownNow(); + } + } + } + + dstCluster.saveCubeInstance(cube); + } + + { + ProjectInstance dstProject = getDstProject(srcProject); + + // update tables in project + Set<String> projTables = Sets.newHashSet(dstProject.getTables()); + projTables.addAll(tableSet.stream().map(TableDesc::getIdentity).collect(Collectors.toSet())); + dstProject.setTables(projTables); + + // update models in project + Set<String> projModels = Sets.newHashSet(dstProject.getModels()); + projModels.add(modelName); + dstProject.setModels(Lists.newArrayList(projModels)); + + // update cubes in project + Set<RealizationEntry> projReals = Sets.newHashSet(dstProject.getRealizationEntries()); + projReals.add(RealizationEntry.create(RealizationType.CUBE, cube.getName())); + dstProject.setRealizationEntries(Lists.newArrayList(projReals)); + + dstProjects.put(dstProject.getName(), dstProject); + } + } + + private void checkGlobalDict(CubeDesc cubeDesc) { + if (cubeDesc.getDictionaries() != null && !cubeDesc.getDictionaries().isEmpty()) { + for (DictionaryDesc dictDesc : cubeDesc.getDictionaries()) { + if (GlobalDictionaryBuilder.class.getName().equalsIgnoreCase(dictDesc.getBuilderClass())) { + throw new RuntimeException("it's not supported to migrate global dictionaries " + dictDesc + + " for cube " + cubeDesc.getName()); + } + } + } + } + + private List<Future<?>> migrateCubeData(CubeInstance cube, CubeDesc cubeDesc, ExecutorService executor) + throws IOException { + List<Future<?>> futureList = Lists.newLinkedList(); + + for (final CubeSegment segment : cube.getSegments(SegmentStatusEnum.READY)) { + logger.info("start to migrate segment: {} {}", cube, segment.getName()); + copyMetaResource(segment.getStatisticsResourcePath()); + for (String dict : segment.getDictionaryPaths()) { + copyDictionary(cube, dict); + } + for (String snapshot : segment.getSnapshotPaths()) { + copySnapshot(cube, snapshot); + } + Future<?> future; + future = executor.submit(new MyRunnable() { + @Override + public void doRun() throws Exception { + copyHDFSJobInfo(segment.getLastBuildJobID()); + } + }); + futureList.add(future); + + future = executor.submit(new MyRunnable() { + @Override + public void doRun() throws Exception { + copyHTable(segment); + } + }); + futureList.add(future); + + logger.info("add segment {} to migration list", segment); + } + if (cubeDesc.getSnapshotTableDescList() != null) { + for (SnapshotTableDesc snapshotTable : cubeDesc.getSnapshotTableDescList()) { + if (snapshotTable.isGlobal()) { + String snapshotResPath = cube.getSnapshotResPath(snapshotTable.getTableName()); + if (snapshotTable.isExtSnapshotTable()) { + final ExtTableSnapshotInfo extSnapshot = srcCluster.getExtTableSnapshotInfo(snapshotResPath); + dstCluster.saveExtSnapshotTableInfo(extSnapshot); + if (ExtTableSnapshotInfo.STORAGE_TYPE_HBASE.equals(extSnapshot.getStorageType())) { + Future<?> future = executor.submit(new MyRunnable() { + @Override + public void doRun() throws Exception { + copyHTable(extSnapshot); + } + }); + futureList.add(future); + } + } else { + copySnapshot(cube, snapshotResPath); + } + logger.info("add cube-level snapshot table {} for cube {} to migration list", snapshotResPath, + cube); + } + } + } + + return futureList; + } + + private ProjectInstance getDstProject(ProjectInstance srcProject) throws IOException { + ProjectInstance dstProject = dstProjects.get(srcProject.getName()); + if (dstProject == null) { + dstProject = dstCluster.getProject(srcProject.getName()); + } + if (dstProject == null) { + dstProject = ProjectInstance.create(srcProject.getName(), srcProject.getOwner(), + srcProject.getDescription(), srcProject.getOverrideKylinProps(), null, null); + dstProject.setUuid(srcProject.getUuid()); + } + return dstProject; + } + + private void putUserInfo(String userName) throws IOException { + String userKey = KylinUserService.getId(userName); + ManagedUser user = srcCluster.getUserDetails(userKey); + if (user == null) { + logger.warn("Cannot find user {}", userName); + return; + } + dstCluster.saveUserInfo(userKey, user); + } + + private void copyMetaResource(String item) throws IOException { + RawResource res = srcCluster.getResource(item); + dstCluster.putResource(item, res); + res.content().close(); + } + + private void copyDictionary(CubeInstance cube, String dictPath) throws IOException { + if (dstCluster.exists(dictPath)) { + logger.info("Item {} has already existed in destination cluster", dictPath); + return; + } + DictionaryInfo dictInfo = srcCluster.getDictionaryInfo(dictPath); + String dupDict = dstCluster.saveDictionary(dictInfo); + if (dupDict != null) { + for (CubeSegment segment : cube.getSegments()) { + for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) { + if (entry.getValue().equalsIgnoreCase(dictPath)) { + entry.setValue(dupDict); + } + } + } + logger.info("Item {} is dup, instead {} is reused", dictPath, dupDict); + } + } + + private void copySnapshot(CubeInstance cube, String snapshotTablePath) throws IOException { + if (dstCluster.exists(snapshotTablePath)) { + logger.info("Item {} has already existed in destination cluster", snapshotTablePath); + return; + } + SnapshotTable snapshotTable = srcCluster.getSnapshotTable(snapshotTablePath); + dstCluster.saveSnapshotTable(snapshotTable); + } + + private void copyHDFSJobInfo(String jobId) throws Exception { + String srcDirQualified = srcCluster.getJobWorkingDirQualified(jobId); + String dstDirQualified = dstCluster.getJobWorkingDirQualified(jobId); + if (ifExecute) { + dstCluster.copyInitOnJobCluster(new Path(dstDirQualified)); + copyHDFSPath(srcDirQualified, srcCluster.jobConf, dstDirQualified, dstCluster.jobConf); + } else { + logger.info("copied hdfs directory from {} to {}", srcDirQualified, dstDirQualified); + } + } + + private void copyHTable(CubeSegment segment) throws IOException { + String tableName = segment.getStorageLocationIdentifier(); + if (ifExecute) { + if (checkHTableExist(segment)) { + logger.info("htable {} has already existed in dst, will skip the migration", tableName); + } else { + copyHTable(tableName, true); + if (!checkHTableEquals(tableName)) { + logger.error("htable {} is copied to dst with different size!!!", tableName); + } + } + } + logger.info("migrated htable {} for segment {}", tableName, segment); + } + + private boolean checkHTableExist(CubeSegment segment) throws IOException { + String tableName = segment.getStorageLocationIdentifier(); + TableName htableName = TableName.valueOf(tableName); + if (!dstCluster.checkExist(htableName, segment)) { + return false; + } + + if (!checkHTableEquals(tableName)) { + logger.warn("although htable {} exists in destination, the details data are different", tableName); + dstCluster.deleteHTable(tableName); + return false; + } + return true; + } + + private boolean checkHTableEquals(String tableName) throws IOException { + HBaseResponse respSrc = HBaseInfoUtil.getHBaseInfo(tableName, srcCluster.hbaseConn); + HBaseResponse respDst = HBaseInfoUtil.getHBaseInfo(tableName, dstCluster.hbaseConn); + return HBaseInfoUtil.checkEquals(respSrc, respDst); + } + + private void copyHTable(ExtTableSnapshotInfo extTableSnapshotInfo) throws IOException { + String tableName = extTableSnapshotInfo.getStorageLocationIdentifier(); + if (ifExecute) { + TableName htableName = TableName.valueOf(tableName); + if (dstCluster.htableExists(htableName)) { + logger.warn("htable {} already exists in the dst cluster and will skip the htable migration"); + } else { + copyHTable(tableName, false); + } + } + logger.info("migrated htable {} for ext table snapshot {}", tableName, extTableSnapshotInfo.getTableName()); + } + + private void copyHTable(String tableName, boolean ifDeployCoprocessor) { + if (ifExecute) { + TableName htableName = TableName.valueOf(tableName); + try { + //migrate data first + copyHFileByDistCp(tableName); + + //create htable metadata, especially the split keys for predefining the regions + Table table = srcCluster.hbaseConn.getTable(TableName.valueOf(tableName)); + byte[][] endKeys = srcCluster.hbaseConn.getRegionLocator(htableName).getEndKeys(); + byte[][] splitKeys = Arrays.copyOfRange(endKeys, 0, endKeys.length - 1); + + HTableDescriptor tableDesc = new HTableDescriptor(table.getTableDescriptor()); + //change the table host + dstCluster.resetTableHost(tableDesc); + if (ifDeployCoprocessor) { + dstCluster.deployCoprocessor(tableDesc, coprocessorJarPath); + } + dstCluster.createTable(tableDesc, splitKeys); + + //do bulk load to sync up htable data and metadata + dstCluster.bulkLoadTable(tableName); + } catch (Exception e) { + logger.error("fail to migrate htable {} due to {} ", tableName, e); + throw new RuntimeException(e); + } + } + } + + protected void copyHFileByDistCp(String tableName) throws Exception { + String srcDirQualified = srcCluster.getRootDirQualifiedOfHTable(tableName); + String dstDirQualified = dstCluster.getRootDirQualifiedOfHTable(tableName); + dstCluster.copyInitOnHBaseCluster(new Path(dstDirQualified)); + copyHDFSPath(srcDirQualified, srcCluster.hbaseConf, dstDirQualified, dstCluster.hbaseConf); + } + + protected void copyHDFSPath(String srcDir, Configuration srcConf, String dstDir, Configuration dstConf) + throws Exception { + logger.info("start to copy hdfs directory from {} to {}", srcDir, dstDir); + DistCpOptions distCpOptions = OptionsParser.parse(new String[] { srcDir, dstDir }); + distCpOptions.preserve(DistCpOptions.FileAttribute.BLOCKSIZE); + distCpOptions.setBlocking(true); + setTargetPathExists(distCpOptions); + DistCp distCp = new DistCp(getConfOfDistCp(), distCpOptions); + distCp.execute(); + logger.info("copied hdfs directory from {} to {}", srcDir, dstDir); + } + + protected Configuration getConfOfDistCp() { + return distCpConf; + } + + /** + * Set targetPathExists in both inputOptions and job config, + * for the benefit of CopyCommitter + */ + public void setTargetPathExists(DistCpOptions inputOptions) throws IOException { + Path target = inputOptions.getTargetPath(); + FileSystem targetFS = target.getFileSystem(dstCluster.jobConf); + boolean targetExists = targetFS.exists(target); + inputOptions.setTargetPathExists(targetExists); + dstCluster.jobConf.setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, targetExists); + } + + private void addHybrid(HybridInstance hybrid) { + hybrids.add(hybrid); + for (IRealization realization : hybrid.getRealizations()) { + addRealization(realization); + } + } + + private void addRealization(IRealization realization) { + if (realization instanceof HybridInstance) { + addHybrid((HybridInstance) realization); + } else if (realization instanceof CubeInstance) { + cubes.add((CubeInstance) realization); + } else { + logger.warn("Realization {} is neither hybrid nor cube", realization); + } + } + + private static abstract class MyRunnable implements Runnable { + @Override + public void run() { + try { + doRun(); + } catch (RuntimeException e) { + throw e; + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + public abstract void doRun() throws Exception; + } + + public static void main(String[] args) { + CubeMigrationCrossClusterCLI cli = new CubeMigrationCrossClusterCLI(); + cli.execute(args); + } +} \ No newline at end of file diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java b/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java new file mode 100644 index 0000000..e578935 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.migration; + +import static org.apache.kylin.metadata.realization.IRealizationConstants.HTableSegmentTag; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; +import java.util.Set; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.persistence.RawResource; +import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.cube.CubeDescManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.dict.DictionaryInfo; +import org.apache.kylin.dict.DictionaryInfoSerializer; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager; +import org.apache.kylin.dict.lookup.SnapshotTable; +import org.apache.kylin.dict.lookup.SnapshotTableSerializer; +import org.apache.kylin.metadata.TableMetadataManager; +import org.apache.kylin.metadata.cachesync.Broadcaster; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.DataModelManager; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; +import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.rest.security.ManagedUser; +import org.apache.kylin.rest.service.KylinUserService; +import org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI; +import org.apache.kylin.storage.hybrid.HybridInstance; +import org.apache.kylin.storage.hybrid.HybridManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class DstClusterUtil extends ClusterUtil { + private static final Logger logger = LoggerFactory.getLogger(DstClusterUtil.class); + + public static final String hbaseSubDir = "migration/hbase/data/default/"; + + private final String hbaseDataDirQualified; + private final String hbaseDataDir; + + private final boolean ifExecute; + + public DstClusterUtil(String configURI, boolean ifExecute) throws IOException { + this(configURI, true, true, ifExecute); + } + + public DstClusterUtil(String configURI, boolean ifJobFSHAEnabled, boolean ifHBaseFSHAEnabled, boolean ifExecute) + throws IOException { + super(configURI, ifJobFSHAEnabled, ifHBaseFSHAEnabled); + this.hbaseDataDirQualified = hbaseHdfsWorkingDirectoryQualified + hbaseSubDir; + this.hbaseDataDir = hdfsWorkingDirectory + hbaseSubDir; + this.ifExecute = ifExecute; + } + + @Override + public ProjectInstance getProject(String projName) throws IOException { + return resourceStore.getResource(ProjectInstance.concatResourcePath(projName), + ProjectManager.PROJECT_SERIALIZER); + } + + @Override + public DictionaryInfo getDictionaryInfo(String dictPath) throws IOException { + return resourceStore.getResource(dictPath, DictionaryInfoSerializer.FULL_SERIALIZER); + } + + @Override + public SnapshotTable getSnapshotTable(String snapshotPath) throws IOException { + return resourceStore.getResource(snapshotPath, SnapshotTableSerializer.FULL_SERIALIZER); + } + + @Override + public String getRootDirQualifiedOfHTable(String tableName) { + return hbaseDataDirQualified + tableName; + } + + private String getRootDirOfHTable(String tableName) { + return hbaseDataDir + tableName; + } + + public boolean exists(String resPath) throws IOException { + return resourceStore.exists(resPath); + } + + public void checkCompatibility(String projectName, Set<TableDesc> tableSet, DataModelDesc modelDesc, + boolean ifHiveCheck) throws IOException { + List<String> tableDataList = Lists.newArrayList(); + for (TableDesc table : tableSet) { + tableDataList.add(JsonUtil.writeValueAsIndentString(table)); + } + + String modelDescData = JsonUtil.writeValueAsIndentString(modelDesc); + + CompatibilityCheckRequest request = new CompatibilityCheckRequest(); + request.setProjectName(projectName); + request.setTableDescDataList(tableDataList); + request.setModelDescData(modelDescData); + + String jsonRequest = JsonUtil.writeValueAsIndentString(request); + restClient.checkCompatibility(jsonRequest, ifHiveCheck); + } + + public void saveProject(ProjectInstance projInstance) throws IOException { + if (ifExecute) { + putMetaResource(ProjectInstance.concatResourcePath(projInstance.getName()), projInstance, + ProjectManager.PROJECT_SERIALIZER); + } + logger.info("saved project {}", projInstance); + } + + public void saveHybrid(HybridInstance hybridInstance) throws IOException { + if (ifExecute) { + putMetaResource(HybridInstance.concatResourcePath(hybridInstance.getName()), hybridInstance, + HybridManager.HYBRID_SERIALIZER); + } + logger.info("saved hybrid {}", hybridInstance); + } + + public void saveTableDesc(TableDesc table) throws IOException { + if (ifExecute) { + putMetaResource(TableDesc.concatResourcePath(table.getIdentity(), table.getProject()), table, + TableMetadataManager.TABLE_SERIALIZER); + } + logger.info("saved table {}", table); + } + + public void saveModelDesc(DataModelDesc modelDesc) throws IOException { + if (ifExecute) { + putMetaResource(DataModelDesc.concatResourcePath(modelDesc.getName()), modelDesc, + DataModelManager.MODELDESC_SERIALIZER); + } + logger.info("saved model {}", modelDesc); + } + + public void saveCubeDesc(CubeDesc cubeDesc) throws IOException { + if (ifExecute) { + putMetaResource(CubeDesc.concatResourcePath(cubeDesc.getName()), cubeDesc, + CubeDescManager.CUBE_DESC_SERIALIZER); + } + logger.info("saved cube desc {}", cubeDesc); + } + + public void saveCubeInstance(CubeInstance cube) throws IOException { + if (ifExecute) { + putMetaResource(CubeInstance.concatResourcePath(cube.getName()), cube, CubeManager.CUBE_SERIALIZER); + } + logger.info("saved cube instance {}", cube); + } + + public String saveDictionary(DictionaryInfo dictInfo) throws IOException { + String dupDict = checkDupDict(dictInfo); + if (dupDict == null) { + putMetaResource(dictInfo.getResourcePath(), dictInfo, DictionaryInfoSerializer.FULL_SERIALIZER); + logger.info("saved dictionary {}", dictInfo.getResourcePath()); + } + return dupDict; + } + + private String checkDupDict(DictionaryInfo dictInfo) throws IOException { + NavigableSet<String> existings = resourceStore.listResources(dictInfo.getResourceDir()); + if (existings == null) + return null; + + logger.info("{} existing dictionaries of the same column", existings.size()); + if (existings.size() > 100) { + logger.warn("Too many dictionaries under {}, dict count: {}", dictInfo.getResourceDir(), existings.size()); + } + + for (String existing : existings) { + DictionaryInfo existingInfo = getDictionaryInfo(existing); + if (existingInfo != null && dictInfo.getDictionaryObject().equals(existingInfo.getDictionaryObject())) { + return existing; + } + } + + return null; + } + + public void saveSnapshotTable(SnapshotTable snapshotTable) throws IOException { + putMetaResource(snapshotTable.getResourcePath(), snapshotTable, SnapshotTableSerializer.FULL_SERIALIZER); + logger.info("saved snapshot table {}", snapshotTable.getResourcePath()); + } + + public void saveExtSnapshotTableInfo(ExtTableSnapshotInfo extTableSnapshotInfo) throws IOException { + putMetaResource(extTableSnapshotInfo.getResourcePath(), extTableSnapshotInfo, + ExtTableSnapshotInfoManager.SNAPSHOT_SERIALIZER); + logger.info("saved ext snapshot table info {}", extTableSnapshotInfo.getResourcePath()); + } + + public void saveUserInfo(String userKey, ManagedUser user) throws IOException { + if (ifExecute) { + putMetaResource(userKey, user, KylinUserService.SERIALIZER); + } + logger.info("saved user info {}", userKey); + } + + private <T extends RootPersistentEntity> void putMetaResource(String resPath, T obj, Serializer<T> serializer) + throws IOException { + putMetaResource(resPath, obj, serializer, true); + } + + public <T extends RootPersistentEntity> void putMetaResource(String resPath, T obj, Serializer<T> serializer, + boolean withoutCheck) throws IOException { + if (ifExecute) { + if (withoutCheck) { + resourceStore.putResource(resPath, obj, System.currentTimeMillis(), serializer); + } else { + resourceStore.checkAndPutResource(resPath, obj, System.currentTimeMillis(), serializer); + } + } + logger.info("saved resource {}", resPath); + } + + public void putResource(String resPath, RawResource res) throws IOException { + if (ifExecute) { + resourceStore.putResource(resPath, res.content(), res.lastModified()); + } + logger.info("saved resource {}", resPath); + } + + // if htable does not exist in dst, return false; + // if htable exists in dst, and the segment tags are the same, if the htable is enabled, then return true; + // else delete the htable and return false; + // else the htable is used by others, should throw runtime exception + public boolean checkExist(TableName htableName, CubeSegment segment) throws IOException { + if (!htableExists(htableName)) { + return false; + } + Table table = hbaseConn.getTable(htableName); + HTableDescriptor tableDesc = table.getTableDescriptor(); + if (segment.toString().equals(tableDesc.getValue(HTableSegmentTag))) { + if (hbaseAdmin.isTableEnabled(htableName)) { + return true; + } else { + hbaseAdmin.deleteTable(htableName); + logger.info("htable {} is deleted", htableName); + return false; + } + } + throw new RuntimeException( + "htable name " + htableName + " has been used by " + tableDesc.getValue(HTableSegmentTag)); + } + + public void deleteHTable(String tableName) throws IOException { + TableName htableName = TableName.valueOf(tableName); + if (hbaseAdmin.isTableEnabled(htableName)) { + hbaseAdmin.disableTable(htableName); + } + hbaseAdmin.deleteTable(htableName); + logger.info("htable {} is deleted", htableName); + } + + public boolean htableExists(TableName htableName) throws IOException { + return hbaseAdmin.tableExists(htableName); + } + + public void resetTableHost(HTableDescriptor tableDesc) { + tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix()); + } + + public void deployCoprocessor(HTableDescriptor tableDesc, String localCoprocessorJar) throws IOException { + List<String> existingCoprocessors = tableDesc.getCoprocessors(); + for (String existingCoprocessor : existingCoprocessors) { + tableDesc.removeCoprocessor(existingCoprocessor); + } + + Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, hbaseFS, + hdfsWorkingDirectory, null); + + if (User.isHBaseSecurityEnabled(hbaseConf)) { + // add coprocessor for bulk load + tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); + } + DeployCoprocessorCLI.addCoprocessorOnHTable(tableDesc, hdfsCoprocessorJar); + + logger.info("deployed hbase table {} with coprocessor.", tableDesc.getTableName()); + } + + public void createTable(HTableDescriptor tableDesc, byte[][] splitKeys) throws IOException { + hbaseAdmin.createTable(tableDesc, splitKeys); + + logger.info("htable {} successfully created!", tableDesc.getTableName()); + } + + public void copyInitOnJobCluster(Path path) throws IOException { + copyInit(jobFS, path); + } + + public void copyInitOnHBaseCluster(Path path) throws IOException { + copyInit(hbaseFS, path); + } + + public static void copyInit(FileSystem fs, Path path) throws IOException { + path = Path.getPathWithoutSchemeAndAuthority(path); + Path pathP = path.getParent(); + if (!fs.exists(pathP)) { + fs.mkdirs(pathP); + } + if (fs.exists(path)) { + logger.warn("path {} already existed and will be deleted", path); + HadoopUtil.deletePath(fs.getConf(), path); + } + } + + public void bulkLoadTable(String tableName) throws Exception { + Path rootPathOfTable = new Path(getRootDirOfHTable(tableName)); + FileStatus[] regionFiles = hbaseFS.listStatus(rootPathOfTable, new PathFilter() { + @Override + public boolean accept(Path path) { + return !path.getName().startsWith("."); + } + }); + + for (FileStatus regionFileStatus : regionFiles) { + ToolRunner.run(new LoadIncrementalHFiles(hbaseConf), + new String[] { regionFileStatus.getPath().toString(), tableName }); + } + + logger.info("succeed to migrate htable {}", tableName); + } + + public void updateMeta() { + if (ifExecute) { + try { + logger.info("update meta cache for {}", restClient); + restClient.announceWipeCache(Broadcaster.SYNC_ALL, Broadcaster.Event.UPDATE.getType(), + Broadcaster.SYNC_ALL); + } catch (IOException e) { + logger.error(e.getMessage()); + } + } + } +} diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/SrcClusterUtil.java b/tool/src/main/java/org/apache/kylin/tool/migration/SrcClusterUtil.java new file mode 100644 index 0000000..7f865d5 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/migration/SrcClusterUtil.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.migration; + +import java.io.IOException; +import java.util.List; + +import org.apache.kylin.cube.CubeDescManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.dict.DictionaryInfo; +import org.apache.kylin.dict.DictionaryManager; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager; +import org.apache.kylin.dict.lookup.SnapshotManager; +import org.apache.kylin.dict.lookup.SnapshotTable; +import org.apache.kylin.metadata.TableMetadataManager; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.DataModelManager; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; +import org.apache.kylin.metadata.project.RealizationEntry; +import org.apache.kylin.metadata.realization.IRealization; +import org.apache.kylin.metadata.realization.RealizationRegistry; +import org.apache.kylin.metadata.realization.RealizationType; +import org.apache.kylin.storage.hybrid.HybridInstance; +import org.apache.kylin.storage.hybrid.HybridManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SrcClusterUtil extends ClusterUtil { + private static final Logger logger = LoggerFactory.getLogger(SrcClusterUtil.class); + + private static final String hbaseRootDirConfKey = "hbase.rootdir"; + private final String hbaseDataDir; + + private final TableMetadataManager metadataManager; + private final DataModelManager modelManager; + private final ProjectManager projectManager; + private final HybridManager hybridManager; + private final CubeManager cubeManager; + private final CubeDescManager cubeDescManager; + private final RealizationRegistry realizationRegistry; + private final DictionaryManager dictionaryManager; + private final SnapshotManager snapshotManager; + private final ExtTableSnapshotInfoManager extSnapshotInfoManager; + + public SrcClusterUtil(String configURI, boolean ifJobFSHAEnabled, boolean ifHBaseFSHAEnabled) throws IOException { + super(configURI, ifJobFSHAEnabled, ifHBaseFSHAEnabled); + + this.hbaseDataDir = hbaseConf.get(hbaseRootDirConfKey) + "/data/default/"; + metadataManager = TableMetadataManager.getInstance(kylinConfig); + modelManager = DataModelManager.getInstance(kylinConfig); + projectManager = ProjectManager.getInstance(kylinConfig); + hybridManager = HybridManager.getInstance(kylinConfig); + cubeManager = CubeManager.getInstance(kylinConfig); + cubeDescManager = CubeDescManager.getInstance(kylinConfig); + realizationRegistry = RealizationRegistry.getInstance(kylinConfig); + dictionaryManager = DictionaryManager.getInstance(kylinConfig); + snapshotManager = SnapshotManager.getInstance(kylinConfig); + extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig); + } + + public String getDefaultCoprocessorJarPath() { + return kylinConfig.getCoprocessorLocalJar(); + } + + @Override + public ProjectInstance getProject(String projectName) throws IOException { + return projectManager.getProject(projectName); + } + + public List<ProjectInstance> listAllProjects() throws IOException { + return projectManager.listAllProjects(); + } + + public ProjectInstance getProjectByRealization(RealizationType type, String realizationName) throws IOException { + List<ProjectInstance> ret = projectManager.findProjects(type, realizationName); + return ret.isEmpty() ? null : ret.get(0); + } + + public CubeInstance getCube(String name) throws IOException { + return cubeManager.getCube(name); + } + + public CubeDesc getCubeDesc(String name) throws IOException { + return cubeDescManager.getCubeDesc(name); + } + + public HybridInstance getHybrid(String name) throws IOException { + return hybridManager.getHybridInstance(name); + } + + public IRealization getRealization(RealizationEntry entry) throws IOException { + return realizationRegistry.getRealization(entry.getType(), entry.getRealization()); + } + + public DataModelDesc getDataModelDesc(String modelName) throws IOException { + return modelManager.getDataModelDesc(modelName); + } + + public TableDesc getTableDesc(String tableIdentity, String projectName) throws IOException { + TableDesc ret = metadataManager.getStore().getResource(TableDesc.concatResourcePath(tableIdentity, projectName), + TableMetadataManager.TABLE_SERIALIZER); + if (projectName != null && ret == null) { + ret = metadataManager.getStore().getResource(TableDesc.concatResourcePath(tableIdentity, null), + TableMetadataManager.TABLE_SERIALIZER); + } + return ret; + } + + @Override + public DictionaryInfo getDictionaryInfo(String dictPath) throws IOException { + return dictionaryManager.getDictionaryInfo(dictPath); + } + + @Override + public SnapshotTable getSnapshotTable(String snapshotPath) throws IOException { + return snapshotManager.getSnapshotTable(snapshotPath); + } + + public ExtTableSnapshotInfo getExtTableSnapshotInfo(String snapshotPath) throws IOException { + return extSnapshotInfoManager.getSnapshot(snapshotPath); + } + + @Override + public String getRootDirQualifiedOfHTable(String tableName) { + return hbaseDataDir + tableName; + } +} \ No newline at end of file