This is an automated email from the ASF dual-hosted git repository. nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit cb10d1891d067ee080a695e39a990bf8008dee42 Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Mon Jun 22 10:28:20 2020 +0800 EBAY-KYLIN-1997 Add eBay specified migration tool cross DC --- .../migration/ECubeMigrationCrossClusterCLI.java | 154 +++++++++++ .../kylin/tool/migration/EDistCpRestClient.java | 299 +++++++++++++++++++++ 2 files changed, 453 insertions(+) diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/ECubeMigrationCrossClusterCLI.java b/tool/src/main/java/org/apache/kylin/tool/migration/ECubeMigrationCrossClusterCLI.java new file mode 100644 index 0000000..9696867 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/migration/ECubeMigrationCrossClusterCLI.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.migration; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.util.OptionsHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; + +public class ECubeMigrationCrossClusterCLI extends CubeMigrationCrossClusterCLI { + + private static final Logger logger = LoggerFactory.getLogger(ECubeMigrationCrossClusterCLI.class); + + @SuppressWarnings("static-access") + public static final Option OPTION_DISTCP_DO_AS_NAME = OptionBuilder.withArgName("distCpDoAsName").hasArg() + .isRequired(true).withDescription("Specify do as name for distCp").create("distCpDoAsName"); + + @SuppressWarnings("static-access") + public static final Option OPTION_DISTCP_REST_URL = OptionBuilder.withArgName("distCpRestUrl").hasArg() + .isRequired(true).withDescription("Specify rest url for distCp").create("distCpRestUrl"); + + @SuppressWarnings("static-access") + public static final Option OPTION_DISTCP_TOKEN_URL = OptionBuilder.withArgName("distCpTokenUrl").hasArg() + .isRequired(true).withDescription("Specify rest url for asking token for distCp").create("distCpTokenUrl"); + + @SuppressWarnings("static-access") + public static final Option OPTION_DISTCP_TOKEN_KEY = OptionBuilder.withArgName("distCpTokenKey").hasArg() + .isRequired(true).withDescription("Specify api key for asking token").create("distCpTokenKey"); + + @SuppressWarnings("static-access") + public static final Option OPTION_DISTCP_TOKEN_SECRET = OptionBuilder.withArgName("distCpTokenSecret").hasArg() + .isRequired(true).withDescription("Specify api secret for asking token").create("distCpTokenSecret"); + + @SuppressWarnings("static-access") + public static final Option OPTION_DISTCP_FILE_ATTR_KEPT = OptionBuilder.withArgName("distCpFileAttrKept").hasArg() + .isRequired(true).withDescription("Specify the distCp fileAttrKept code").create("distCpFileAttrKept"); + + @SuppressWarnings("static-access") + public static final Option OPTION_DISTCP_JOB_MAX_DURATION = OptionBuilder.withArgName("distCpJobMaxDuration") + .hasArg().isRequired(false).withDescription("Specify distCp job max duration") + .create("distCpJobMaxDuration"); + @SuppressWarnings("static-access") + public static final Option OPTION_DISTCP_JOB_STATUS_CHECK_INTERVAL = OptionBuilder + .withArgName("distCpJobCheckInterval").hasArg().isRequired(false) + .withDescription("Specify distCp job status check interval(minute unit)").create("distCpJobCheckInterval"); + + @SuppressWarnings("static-access") + public static final Option OPTION_FS_ACLS_ENABLED_CODE = OptionBuilder.withArgName("codeOfFSAclsEnabled").hasArg() + .isRequired(false).withDescription("Specify whether dfs.namenode.acls.enabled is true or false") + .create("codeOfFSAclsEnabled"); + + private static final String KeyOfFsAclsEnabled = "dfs.namenode.acls.enabled"; + private static final String KeyOfIpcClientSocketBind = "hadoop.ebay.ipc.client.socket.bind"; + + private static final long DEFAULT_DISTCP_JOB_MAX_DURATION = 120L; + private static final long DEFAULT_DISTCP_JOB_STATUS_CHECK_INTERVAL = 1L; + + protected EDistCpRestClient distCpClient; + + private int codeOfFSAclsEnabled = 3; + + public ECubeMigrationCrossClusterCLI() { + super(); + + options.addOption(OPTION_DISTCP_JOB_MAX_DURATION); + options.addOption(OPTION_DISTCP_JOB_STATUS_CHECK_INTERVAL); + options.addOption(OPTION_DISTCP_REST_URL); + options.addOption(OPTION_FS_ACLS_ENABLED_CODE); + options.addOption(OPTION_DISTCP_TOKEN_URL); + options.addOption(OPTION_DISTCP_TOKEN_KEY); + options.addOption(OPTION_DISTCP_TOKEN_SECRET); + options.addOption(OPTION_DISTCP_DO_AS_NAME); + options.addOption(OPTION_DISTCP_FILE_ATTR_KEPT); + } + + public static boolean ifFSAclsEnabled(int code, int pos) { + int which = 1 << pos; + return (code & which) == which; + } + + @Override + protected void init(OptionsHelper optionsHelper) throws Exception { + super.init(optionsHelper); + + codeOfFSAclsEnabled = optionsHelper.hasOption(OPTION_FS_ACLS_ENABLED_CODE) + ? Integer.parseInt(optionsHelper.getOptionValue(OPTION_FS_ACLS_ENABLED_CODE)) + : 3; + srcCluster.jobConf.set(KeyOfFsAclsEnabled, "" + ifFSAclsEnabled(codeOfFSAclsEnabled, 0)); + srcCluster.hbaseConf.set(KeyOfFsAclsEnabled, "" + ifFSAclsEnabled(codeOfFSAclsEnabled, 1)); + srcCluster.jobConf.set(KeyOfIpcClientSocketBind, "false"); + + long distCpJobMaxDuration = optionsHelper.hasOption(OPTION_DISTCP_JOB_MAX_DURATION) + ? Long.parseLong(optionsHelper.getOptionValue(OPTION_DISTCP_JOB_MAX_DURATION)) + : DEFAULT_DISTCP_JOB_MAX_DURATION; + + long distCpJobCheckInterval = optionsHelper.hasOption(OPTION_DISTCP_JOB_STATUS_CHECK_INTERVAL) + ? Long.parseLong(optionsHelper.getOptionValue(OPTION_DISTCP_JOB_STATUS_CHECK_INTERVAL)) + : DEFAULT_DISTCP_JOB_STATUS_CHECK_INTERVAL; + + String distCpRestUrl = optionsHelper.getOptionValue(OPTION_DISTCP_REST_URL); + String distCpTokenUrl = optionsHelper.getOptionValue(OPTION_DISTCP_TOKEN_URL); + String distCpTokenKey = optionsHelper.getOptionValue(OPTION_DISTCP_TOKEN_KEY); + String distCpTokenSecret = optionsHelper.getOptionValue(OPTION_DISTCP_TOKEN_SECRET); + String distCpDoAsName = optionsHelper.getOptionValue(OPTION_DISTCP_DO_AS_NAME); + int fileAttrKept = Integer.parseInt(optionsHelper.getOptionValue(OPTION_DISTCP_FILE_ATTR_KEPT)); + distCpClient = new EDistCpRestClient(distCpRestUrl, distCpJobMaxDuration, distCpJobCheckInterval, + distCpTokenUrl, distCpTokenKey, distCpTokenSecret, distCpDoAsName, fileAttrKept, nThread); + } + + @Override + protected void copyHDFSPath(String srcDir, Configuration srcConf, String dstDir, Configuration dstConf) + throws Exception { + Path dstPathParent = new Path(dstDir).getParent(); + FileSystem dstFs = FileSystem.get(dstConf); + if (!dstFs.exists(dstPathParent)) { + dstFs.mkdirs(dstPathParent); + } + + logger.info("start to copy hdfs directory from {} to {}", srcDir, dstDir); + String aclsEnabledStr = srcConf.get(KeyOfFsAclsEnabled); + boolean aclsEnabled = Strings.isNullOrEmpty(aclsEnabledStr) || Boolean.parseBoolean(aclsEnabledStr); + logger.info("{}: {}", KeyOfFsAclsEnabled, aclsEnabled); + String jobId = distCpClient.submitDistCpJob(srcDir, dstPathParent.toString(), aclsEnabled); + distCpClient.waitForJobFinish(jobId); + logger.info("copied hdfs directory from {} to {}", srcDir, dstDir); + } + + public static void main(String[] args) { + ECubeMigrationCrossClusterCLI cli = new ECubeMigrationCrossClusterCLI(); + cli.execute(args); + } +} diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/EDistCpRestClient.java b/tool/src/main/java/org/apache/kylin/tool/migration/EDistCpRestClient.java new file mode 100644 index 0000000..ee86f42 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/migration/EDistCpRestClient.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.migration; + +import java.io.IOException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.http.HttpResponse; +import org.apache.http.NoHttpResponseException; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.conn.ssl.TrustStrategy; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.conn.PoolingClientConnectionManager; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.HttpConnectionParams; +import org.apache.http.params.HttpParams; +import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; +import org.apache.kylin.common.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.sun.tools.javac.util.Assert; + +public class EDistCpRestClient { + + private static final Logger logger = LoggerFactory.getLogger(EDistCpRestClient.class); + + protected static final int HTTP_CONNECTION_TIMEOUT_MS = 30000; + protected static final int HTTP_SOCKET_TIMEOUT_MS = 120000; + + protected static final long ONE_MINUTE = 60000L; + + protected static final int retryCount = 3; + + protected static final ObjectMapper mapper = new ObjectMapper(); + + protected DefaultHttpClient client; + private final String baseUrl; + + private final long distCpJobMaxDuration; + private final long distCpJobCheckInterval; + private final ETokenGenerator tokenGenerator; + + private final String doAsName; + + private final int fileAttrKept; + + public EDistCpRestClient(String url, long distCpJobMaxDuration, long distCpJobCheckInterval, String tokenUrl, + String apiKey, String apiSecret, String doAsName, int fileAttrKept) { + this(url, distCpJobMaxDuration, distCpJobCheckInterval, tokenUrl, apiKey, apiSecret, doAsName, fileAttrKept, 8); + } + + public EDistCpRestClient(String url, long distCpJobMaxDuration, long distCpJobCheckInterval, String tokenUrl, + String apiKey, String apiSecret, String doAsName, int fileAttrKept, int maxPerRoute) { + this.distCpJobMaxDuration = distCpJobMaxDuration * ONE_MINUTE; + this.distCpJobCheckInterval = distCpJobCheckInterval * ONE_MINUTE; + this.baseUrl = url; + this.tokenGenerator = new ETokenGenerator(tokenUrl, apiKey, apiSecret); + this.doAsName = doAsName; + this.fileAttrKept = fileAttrKept; + + final HttpParams httpParams = new BasicHttpParams(); + HttpConnectionParams.setSoTimeout(httpParams, HTTP_SOCKET_TIMEOUT_MS); + HttpConnectionParams.setConnectionTimeout(httpParams, HTTP_CONNECTION_TIMEOUT_MS); + + PoolingClientConnectionManager pccm = new PoolingClientConnectionManager(); + pccm.setDefaultMaxPerRoute(maxPerRoute); + pccm.setMaxTotal(maxPerRoute); + client = new DefaultHttpClient(pccm, httpParams); + client.setHttpRequestRetryHandler(new HttpRequestRetryHandler() { + @Override + public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { + if (executionCount > retryCount) { + logger.warn("Maximum tries reached for client http pool "); + return false; + } + if (exception instanceof NoHttpResponseException) { + logger.warn("No response from server on " + executionCount + " call"); + return true; + } + return false; + } + }); + + // trust all certificates + if (baseUrl.startsWith("https")) { + try { + SSLSocketFactory sslsf = new SSLSocketFactory(new TrustStrategy() { + public boolean isTrusted(final X509Certificate[] chain, String authType) + throws CertificateException { + // Oh, I am easy... + return true; + } + }); + client.getConnectionManager().getSchemeRegistry().register(new Scheme("https", 443, sslsf)); + } catch (Exception e) { + throw new RuntimeException("Initialize HTTPS client failed", e); + } + } + } + + private void addHttpHeaders(HttpRequestBase method) { + method.addHeader("Accept", "application/json, text/plain, */*"); + method.addHeader("Content-Type", "application/json"); + method.addHeader("AUTH_TYPE", "KEYSTONE"); + method.addHeader("KEYSTONE_TOKEN", tokenGenerator.generateToken()); + } + + public HttpResponse doGet(String url) throws Exception { + HttpGet request = new HttpGet(baseUrl + url); + addHttpHeaders(request); + return client.execute(request); + } + + public HttpResponse doPost(String url, String jsonRequest) throws Exception { + HttpPost request = new HttpPost(baseUrl + url); + if (!Strings.isNullOrEmpty(jsonRequest)) { + request.setEntity(new StringEntity(jsonRequest, "UTF-8")); + } + addHttpHeaders(request); + return client.execute(request); + } + + public String submitDistCpJob(String srcDir, String dstDir, boolean aclsEnabled) throws Exception { + StringBuilder requestJson = new StringBuilder(); + requestJson.append("{")// + .append("\"src\": \"").append(srcDir).append("\"")// + .append(", \"dst\": \"").append(dstDir).append("\"")// + .append(", \"doAs\": \"").append(doAsName).append("\""); + if (!aclsEnabled) { + requestJson.append(", \"fileAttrKept\": ").append(fileAttrKept); + } + requestJson.append("}"); + + HttpResponse response = doPost("", requestJson.toString()); + String responseContent = EntityUtils.toString(response.getEntity()); + logger.info("Response for submit distcp job from {} to {}: {}", srcDir, dstDir, responseContent); + JsonNode root = mapper.readTree(responseContent); + return root.get("result").get("jobId").toString(); + } + + public void waitForJobFinish(String jobId) throws Exception { + long jobEndTime = System.currentTimeMillis() + distCpJobMaxDuration; + while (System.currentTimeMillis() < jobEndTime) { + String jobStatus = "1"; + try { + jobStatus = checkDistCpJobStatus(jobId); + } catch (Exception e) { + logger.warn("Fail to get job status due to ", e); + } + switch (jobStatus) { + case "0": + jobEndTime = System.currentTimeMillis() + distCpJobMaxDuration; + break; + case "2": + return; + case "3": + throw new RuntimeException("Job " + jobId + " fails"); + case "1": + default: + } + //sleep some time and then try to get job status again + logger.info("Job {} is still running, waiting...", jobId); + Thread.currentThread().sleep(distCpJobCheckInterval); + } + throw new RuntimeException( + "Job " + jobId + " is not finished in " + distCpJobMaxDuration / ONE_MINUTE + " minutes!"); + } + + public String checkDistCpJobStatus(String jobId) throws Exception { + HttpResponse response = doGet("/" + jobId); + String responseContent = EntityUtils.toString(response.getEntity()); + logger.info("Response for check the status of distcp job {}: {}", jobId, responseContent); + JsonNode root = mapper.readTree(responseContent); + return root.get("result").get("status").toString(); + } + + private class ETokenGenerator { + + private final String tokenUrl; + private final String apiKey; + private final String apiSecret; + private final FastDateFormat dateFormat; + private volatile long nextGenTime = 0L; + private volatile String token; + + public ETokenGenerator(String tokenUrl, String apiKey, String apiSecret) { + this.tokenUrl = tokenUrl; + this.apiKey = apiKey; + this.apiSecret = apiSecret; + this.dateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss"); + } + + public String generateToken() { + if (System.currentTimeMillis() > nextGenTime || token == null) { + synchronized (ETokenGenerator.class) { + if (System.currentTimeMillis() > nextGenTime || token == null) { + try { + Pair<String, Long> tokenAndTime = doGenerateToken(); + token = tokenAndTime.getFirst(); + nextGenTime = tokenAndTime.getSecond(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + } + return token; + } + + private long parseTime(String fTimeStr) throws Exception { + String dateStr = fTimeStr.substring(0, 10); + String timeStr = fTimeStr.substring(11, 19); + return dateFormat.parse(dateStr + " " + timeStr).getTime(); + } + + private long updateNextGenTime(String issueTimeStr, String expireTimeStr) throws Exception { + long issueTime = parseTime(issueTimeStr); + long expireTime = parseTime(expireTimeStr); + return System.currentTimeMillis() + (expireTime - issueTime) / 2; + } + + private Pair<String, Long> doGenerateToken() throws Exception { + HttpPost request = new HttpPost(tokenUrl); + request.addHeader("Accept", "application/json, text/plain, */*"); + request.addHeader("Content-Type", "application/json"); + + StringBuilder requestJson = new StringBuilder(); + requestJson.append("{")// + .append("\"auth\": ")// + .append("{")// + .append("\"passwordCredentials\": ")// + .append("{")// + .append("\"username\": \"").append(apiKey).append("\"")// + .append(", \"password\": \"").append(apiSecret).append("\"")// + .append("}}}"); + request.setEntity(new StringEntity(requestJson.toString(), "UTF-8")); + + HttpResponse response = client.execute(request); + String responseContent = EntityUtils.toString(response.getEntity()); + logger.info("Get renewed token for {}", apiKey); + JsonNode root = mapper.readTree(responseContent); + JsonNode tokenRoot = root.get("access").get("token"); + String issueTimeStr = unwrapQuotation(tokenRoot.get("issued_at").toString()); + String expireTimeStr = unwrapQuotation(tokenRoot.get("expires").toString()); + long nextGenTime = updateNextGenTime(issueTimeStr, expireTimeStr); + return new Pair<>(unwrapQuotation(tokenRoot.get("id").toString()), nextGenTime); + } + + private String unwrapQuotation(String qStr) { + return qStr.substring(1, qStr.length() - 1); + } + } + + public static void main(String[] args) throws Exception { + Assert.check(args.length == 8); + String url = args[0]; + String tokenUrl = args[1]; + String apiKey = args[2]; + String apiSecret = args[3]; + String srcDir = args[4]; + String dstDir = args[5]; + String doAsName = args[6]; + int fileAttrKept = Integer.parseInt(args[7]); + EDistCpRestClient client = new EDistCpRestClient(url, 60L, 1L, tokenUrl, apiKey, apiSecret, doAsName, + fileAttrKept); + client.tokenGenerator.generateToken(); + String jobId = client.submitDistCpJob(srcDir, dstDir, false); + client.checkDistCpJobStatus(jobId); + } +}