Repository: kylin Updated Branches: refs/heads/1.x-staging e72e91b47 -> e77e542c2
å¢å 使ç¨kerberos认è¯çæ¹å¼è·åä½ä¸æ§è¡ç¶æï¼å¯é ç½® Signed-off-by: hzfen...@corp.netease.com <hzfen...@corp.netease.com> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e77e542c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e77e542c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e77e542c Branch: refs/heads/1.x-staging Commit: e77e542c2ef3fa39ca558b42acbd7a322c644641 Parents: e72e91b Author: å¯å® <hzfen...@corp.netease.com> Authored: Mon Sep 7 20:32:23 2015 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Fri Dec 25 10:41:47 2015 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/KylinConfig.java | 6 ++ conf/kylin.properties | 4 + job/pom.xml | 8 ++ .../kylin/job/common/MapReduceExecutable.java | 2 +- .../kylin/job/tools/HadoopStatusChecker.java | 8 +- .../kylin/job/tools/HadoopStatusGetter.java | 90 +++++++++++++++++++- pom.xml | 6 ++ 7 files changed, 119 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java index b043b2a..e0f774a 100644 --- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -82,6 +82,8 @@ public class KylinConfig { public static final String KYLIN_JOB_REMOTE_CLI_WORKING_DIR = "kylin.job.remote.cli.working.dir"; public static final String KYLIN_JOB_CMD_EXTRA_ARGS = "kylin.job.cmd.extra.args"; + + public static final String KYLIN_GET_JOB_STATUS_WITH_KERBEROS = "kylin.job.status.with.kerberos"; /** * Toggle to indicate whether to use hive for table flattening. Default * true. @@ -419,6 +421,10 @@ public class KylinConfig { public String getMapReduceCmdExtraArgs() { return getOptional(KYLIN_JOB_CMD_EXTRA_ARGS); } + + public boolean getKylinUseKerberosAuth() { + return Boolean.valueOf(getOptional(KYLIN_GET_JOB_STATUS_WITH_KERBEROS, "false")); + } public String getOverrideHiveTableLocation(String table) { return getOptional(HIVE_TABLE_LOCATION_PREFIX + table.toUpperCase()); http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/conf/kylin.properties b/conf/kylin.properties index 68db224..e196d7a 100644 --- a/conf/kylin.properties +++ b/conf/kylin.properties @@ -64,6 +64,9 @@ kylin.job.concurrent.max.limit=10 # Time interval to check hadoop job status kylin.job.yarn.app.rest.check.interval.seconds=10 +#if you should getting job status from RM with kerberos, set it true.. +kylin.job.status.with.kerberos=false + # Hive database name for putting the intermediate flat tables kylin.job.hive.database.for.intermediatetable=default @@ -142,3 +145,4 @@ kylin.monitor.ext.log.base.dir = /tmp/kylin_log1,/tmp/kylin_log2 #will create external hive table to query result csv file #will set to kylin_query_log by default if not config here kylin.monitor.query.log.parse.result.table = kylin_query_log + http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/job/pom.xml ---------------------------------------------------------------------- diff --git a/job/pom.xml b/job/pom.xml index c6b8b01..c62477e 100644 --- a/job/pom.xml +++ b/job/pom.xml @@ -81,6 +81,14 @@ <artifactId>jackson-databind</artifactId> </dependency> <dependency> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java index 2d2cfe6..a2797d7 100644 --- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java +++ b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java @@ -138,7 +138,7 @@ public class MapReduceExecutable extends AbstractExecutable { return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); } String mrJobId = hadoopCmdOutput.getMrJobId(); - HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output); + HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, context.getConfig()); JobStepStatusEnum status = JobStepStatusEnum.NEW; while (!isDiscarded()) { JobStepStatusEnum newStatus = statusChecker.checkStatus(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java index d55b423..6d741aa 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java +++ b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java @@ -24,6 +24,7 @@ import java.util.Date; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.constant.JobStepStatusEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,11 +40,13 @@ public class HadoopStatusChecker { private final String yarnUrl; private final String mrJobID; private final StringBuilder output; + private final KylinConfig config; - public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output) { + public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output, KylinConfig config) { this.yarnUrl = yarnUrl; this.mrJobID = mrJobID; this.output = output; + this.config = config; } public JobStepStatusEnum checkStatus() { @@ -53,7 +56,8 @@ public class HadoopStatusChecker { } JobStepStatusEnum status = null; try { - final Pair<RMAppState, FinalApplicationStatus> result = new HadoopStatusGetter(yarnUrl, mrJobID).get(); + boolean useKerberosAuth = config.getKylinUseKerberosAuth(); + final Pair<RMAppState, FinalApplicationStatus> result = new HadoopStatusGetter(yarnUrl, mrJobID).get(useKerberosAuth); logger.debug("State of Hadoop job: " + mrJobID + ":" + result.getLeft() + "-" + result.getRight()); output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n"); http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java index a3c411b..8230d48 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java +++ b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java @@ -20,6 +20,7 @@ package org.apache.kylin.job.tools; import java.io.IOException; import java.net.MalformedURLException; +import java.security.Principal; import org.apache.commons.httpclient.Header; import org.apache.commons.httpclient.HttpClient; @@ -28,9 +29,23 @@ import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.protocol.Protocol; import org.apache.commons.httpclient.protocol.ProtocolSocketFactory; import org.apache.commons.lang.StringUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.Credentials; +import org.apache.http.client.config.AuthSchemes; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.config.Lookup; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.impl.auth.SPNegoSchemeFactory; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; @@ -51,10 +66,11 @@ public class HadoopStatusGetter { this.mrJobId = mrJobId; } - public Pair<RMAppState, FinalApplicationStatus> get() throws IOException { + public Pair<RMAppState, FinalApplicationStatus> get(boolean useKerberos) throws IOException { String applicationId = mrJobId.replace("job", "application"); String url = yarnUrl.replace("${job_id}", applicationId); - JsonNode root = new ObjectMapper().readTree(getHttpResponse(url)); + String response = useKerberos ? getHttpResponseWithKerberosAuth(url) : getHttpResponse(url); + JsonNode root = new ObjectMapper().readTree(response); RMAppState state = RMAppState.valueOf(root.findValue("state").getTextValue()); FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(root.findValue("finalStatus").getTextValue()); return Pair.of(state, finalStatus); @@ -121,6 +137,76 @@ public class HadoopStatusGetter { return response; } + + private static String DEFAULT_KRB5_CONFIG_LOCATION = "/etc/krb5.conf"; + private String getHttpResponseWithKerberosAuth(String url) throws IOException { + String krb5ConfigPath = System.getProperty("java.security.krb5.conf"); + if (krb5ConfigPath == null) { + krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION; + } + log.debug("krb5 config file is " + krb5ConfigPath); + + boolean skipPortAtKerberosDatabaseLookup = true; + System.setProperty("java.security.krb5.conf", krb5ConfigPath); + System.setProperty("sun.security.krb5.debug", "true"); + System.setProperty("javax.security.auth.useSubjectCredsOnly","false"); + Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create() + .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup)) + .build(); + + CloseableHttpClient client = HttpClients.custom().setDefaultAuthSchemeRegistry(authSchemeRegistry).build(); + HttpClientContext context = HttpClientContext.create(); + BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + // This may seem odd, but specifying 'null' as principal tells java to use the logged in user's credentials + Credentials useJaasCreds = new Credentials() { + public String getPassword() { + return null; + } + public Principal getUserPrincipal() { + return null; + } + }; + credentialsProvider.setCredentials( new AuthScope(null, -1, null), useJaasCreds ); + context.setCredentialsProvider(credentialsProvider); + String responseString = null; + int count = 0; + int MAX_RETRY_TIME = 3; + while(responseString == null && count ++ < MAX_RETRY_TIME) { + if (url.startsWith("https://")) { + registerEasyHttps(); + } + if (url.contains("anonymous=true") == false) { + url += url.contains("?") ? "&" : "?"; + url += "anonymous=true"; + } + HttpGet httpget = new HttpGet(url); + try { + httpget.addHeader("accept", "application/json"); + CloseableHttpResponse response = client.execute(httpget,context); + String redirect = null; + org.apache.http.Header h = response.getFirstHeader("Refresh"); + if (h != null) { + String s = h.getValue(); + int cut = s.indexOf("url="); + if (cut >= 0) { + redirect = s.substring(cut + 4); + } + } + + if (redirect == null) { + responseString = IOUtils.toString(response.getEntity().getContent()); + log.debug("Job " + mrJobId + " get status check result.\n"); + } else { + url = redirect; + log.debug("Job " + mrJobId + " check redirect url " + url + ".\n"); + } + } finally { + httpget.releaseConnection(); + } + } + + return responseString; + } private static Protocol EASY_HTTPS = null; http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 06a9043..7cb5847 100644 --- a/pom.xml +++ b/pom.xml @@ -67,6 +67,7 @@ <commons-configuration.version>1.9</commons-configuration.version> <commons-daemon.version>1.0.15</commons-daemon.version> <commons-httpclient.version>3.1</commons-httpclient.version> + <httpclient.version>4.5</httpclient.version> <!-- Utility --> <log4j.version>1.2.17</log4j.version> @@ -338,6 +339,11 @@ <version>${commons-httpclient.version}</version> </dependency> <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${httpclient.version}</version> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version>