Repository: kylin
Updated Branches:
  refs/heads/1.x-staging e72e91b47 -> e77e542c2


增加使用kerberos认证的方式获取作业执行状态,可配置

Signed-off-by: hzfen...@corp.netease.com <hzfen...@corp.netease.com>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e77e542c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e77e542c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e77e542c

Branch: refs/heads/1.x-staging
Commit: e77e542c2ef3fa39ca558b42acbd7a322c644641
Parents: e72e91b
Author: 冯宇 <hzfen...@corp.netease.com>
Authored: Mon Sep 7 20:32:23 2015 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Fri Dec 25 10:41:47 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  6 ++
 conf/kylin.properties                           |  4 +
 job/pom.xml                                     |  8 ++
 .../kylin/job/common/MapReduceExecutable.java   |  2 +-
 .../kylin/job/tools/HadoopStatusChecker.java    |  8 +-
 .../kylin/job/tools/HadoopStatusGetter.java     | 90 +++++++++++++++++++-
 pom.xml                                         |  6 ++
 7 files changed, 119 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java 
b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index b043b2a..e0f774a 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -82,6 +82,8 @@ public class KylinConfig {
     public static final String KYLIN_JOB_REMOTE_CLI_WORKING_DIR = 
"kylin.job.remote.cli.working.dir";
 
     public static final String KYLIN_JOB_CMD_EXTRA_ARGS = 
"kylin.job.cmd.extra.args";
+    
+    public static final String KYLIN_GET_JOB_STATUS_WITH_KERBEROS = 
"kylin.job.status.with.kerberos";
     /**
      * Toggle to indicate whether to use hive for table flattening. Default
      * true.
@@ -419,6 +421,10 @@ public class KylinConfig {
     public String getMapReduceCmdExtraArgs() {
         return getOptional(KYLIN_JOB_CMD_EXTRA_ARGS);
     }
+    
+    public boolean getKylinUseKerberosAuth() {
+        return Boolean.valueOf(getOptional(KYLIN_GET_JOB_STATUS_WITH_KERBEROS, 
"false"));
+    }
 
     public String getOverrideHiveTableLocation(String table) {
         return getOptional(HIVE_TABLE_LOCATION_PREFIX + table.toUpperCase());

http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/conf/kylin.properties b/conf/kylin.properties
index 68db224..e196d7a 100644
--- a/conf/kylin.properties
+++ b/conf/kylin.properties
@@ -64,6 +64,9 @@ kylin.job.concurrent.max.limit=10
 # Time interval to check hadoop job status
 kylin.job.yarn.app.rest.check.interval.seconds=10
 
+#if you should getting job status from RM with kerberos, set it true..
+kylin.job.status.with.kerberos=false
+
 # Hive database name for putting the intermediate flat tables
 kylin.job.hive.database.for.intermediatetable=default
 
@@ -142,3 +145,4 @@ kylin.monitor.ext.log.base.dir = 
/tmp/kylin_log1,/tmp/kylin_log2
 #will create external hive table to query result csv file
 #will set to kylin_query_log by default if not config here
 kylin.monitor.query.log.parse.result.table = kylin_query_log
+

http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
index c6b8b01..c62477e 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -81,6 +81,14 @@
             <artifactId>jackson-databind</artifactId>
         </dependency>
         <dependency>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java 
b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
index 2d2cfe6..a2797d7 100644
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
@@ -138,7 +138,7 @@ public class MapReduceExecutable extends AbstractExecutable 
{
                 return new ExecuteResult(ExecuteResult.State.ERROR, 
"restStatusCheckUrl is null");
             }
             String mrJobId = hadoopCmdOutput.getMrJobId();
-            HadoopStatusChecker statusChecker = new 
HadoopStatusChecker(restStatusCheckUrl, mrJobId, output);
+            HadoopStatusChecker statusChecker = new 
HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, context.getConfig());
             JobStepStatusEnum status = JobStepStatusEnum.NEW;
             while (!isDiscarded()) {
                 JobStepStatusEnum newStatus = statusChecker.checkStatus();

http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java 
b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java
index d55b423..6d741aa 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java
@@ -24,6 +24,7 @@ import java.util.Date;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,11 +40,13 @@ public class HadoopStatusChecker {
     private final String yarnUrl;
     private final String mrJobID;
     private final StringBuilder output;
+    private final KylinConfig config; 
 
-    public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder 
output) {
+    public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder 
output, KylinConfig config) {
         this.yarnUrl = yarnUrl;
         this.mrJobID = mrJobID;
         this.output = output;
+        this.config = config;
     }
 
     public JobStepStatusEnum checkStatus() {
@@ -53,7 +56,8 @@ public class HadoopStatusChecker {
         }
         JobStepStatusEnum status = null;
         try {
-            final Pair<RMAppState, FinalApplicationStatus> result = new 
HadoopStatusGetter(yarnUrl, mrJobID).get();
+            boolean useKerberosAuth = config.getKylinUseKerberosAuth();
+            final Pair<RMAppState, FinalApplicationStatus> result = new 
HadoopStatusGetter(yarnUrl, mrJobID).get(useKerberosAuth);
             logger.debug("State of Hadoop job: " + mrJobID + ":" + 
result.getLeft() + "-" + result.getRight());
             output.append(new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + 
result.getLeft() + " - " + result.getRight() + "\n");
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java 
b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java
index a3c411b..8230d48 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java
@@ -20,6 +20,7 @@ package org.apache.kylin.job.tools;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.security.Principal;
 
 import org.apache.commons.httpclient.Header;
 import org.apache.commons.httpclient.HttpClient;
@@ -28,9 +29,23 @@ import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.protocol.Protocol;
 import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.http.auth.AuthSchemeProvider;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.client.config.AuthSchemes;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.config.Lookup;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.impl.auth.SPNegoSchemeFactory;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
@@ -51,10 +66,11 @@ public class HadoopStatusGetter {
         this.mrJobId = mrJobId;
     }
 
-    public Pair<RMAppState, FinalApplicationStatus> get() throws IOException {
+    public Pair<RMAppState, FinalApplicationStatus> get(boolean useKerberos) 
throws IOException {
         String applicationId = mrJobId.replace("job", "application");
         String url = yarnUrl.replace("${job_id}", applicationId);
-        JsonNode root = new ObjectMapper().readTree(getHttpResponse(url));
+        String response = useKerberos ? getHttpResponseWithKerberosAuth(url) : 
getHttpResponse(url);
+        JsonNode root = new ObjectMapper().readTree(response);
         RMAppState state = 
RMAppState.valueOf(root.findValue("state").getTextValue());
         FinalApplicationStatus finalStatus = 
FinalApplicationStatus.valueOf(root.findValue("finalStatus").getTextValue());
         return Pair.of(state, finalStatus);
@@ -121,6 +137,76 @@ public class HadoopStatusGetter {
 
         return response;
     }
+    
+    private static String DEFAULT_KRB5_CONFIG_LOCATION = "/etc/krb5.conf";
+    private String getHttpResponseWithKerberosAuth(String url) throws 
IOException {
+        String krb5ConfigPath = System.getProperty("java.security.krb5.conf");
+        if (krb5ConfigPath == null) {
+            krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION;
+        } 
+        log.debug("krb5 config file is " + krb5ConfigPath);
+      
+        boolean skipPortAtKerberosDatabaseLookup = true;
+        System.setProperty("java.security.krb5.conf", krb5ConfigPath);
+        System.setProperty("sun.security.krb5.debug", "true");
+        System.setProperty("javax.security.auth.useSubjectCredsOnly","false");
+        Lookup<AuthSchemeProvider> authSchemeRegistry = 
RegistryBuilder.<AuthSchemeProvider>create()
+                .register(AuthSchemes.SPNEGO, new 
SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup))
+                .build();
+        
+        CloseableHttpClient client = 
HttpClients.custom().setDefaultAuthSchemeRegistry(authSchemeRegistry).build();
+        HttpClientContext context = HttpClientContext.create();
+        BasicCredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+        // This may seem odd, but specifying 'null' as principal tells java to 
use the logged in user's credentials
+        Credentials useJaasCreds = new Credentials() {
+            public String getPassword() {
+                return null;
+            }
+            public Principal getUserPrincipal() {
+                return null;
+            }
+        };
+        credentialsProvider.setCredentials( new AuthScope(null, -1, null), 
useJaasCreds );
+        context.setCredentialsProvider(credentialsProvider);
+        String responseString = null;
+        int count = 0;
+        int MAX_RETRY_TIME = 3;
+        while(responseString == null && count ++ < MAX_RETRY_TIME) {
+            if (url.startsWith("https://";)) {
+                registerEasyHttps();
+            }
+            if (url.contains("anonymous=true") == false) {
+                url += url.contains("?") ? "&" : "?";
+                url += "anonymous=true";
+            }
+            HttpGet httpget = new HttpGet(url);
+            try {
+                httpget.addHeader("accept", "application/json");
+                CloseableHttpResponse response = 
client.execute(httpget,context);
+                String redirect = null;
+                org.apache.http.Header h = response.getFirstHeader("Refresh");
+                if (h != null) {
+                    String s = h.getValue();
+                    int cut = s.indexOf("url=");
+                    if (cut >= 0) {
+                        redirect = s.substring(cut + 4);
+                    }
+                }
+    
+                if (redirect == null) {
+                    responseString = 
IOUtils.toString(response.getEntity().getContent());
+                    log.debug("Job " + mrJobId + " get status check 
result.\n");
+                } else {
+                    url = redirect;
+                    log.debug("Job " + mrJobId + " check redirect url " + url 
+ ".\n");
+                } 
+            } finally {
+                httpget.releaseConnection();
+            }
+        }
+        
+        return responseString;
+    }
 
     private static Protocol EASY_HTTPS = null;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/e77e542c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 06a9043..7cb5847 100644
--- a/pom.xml
+++ b/pom.xml
@@ -67,6 +67,7 @@
         <commons-configuration.version>1.9</commons-configuration.version>
         <commons-daemon.version>1.0.15</commons-daemon.version>
         <commons-httpclient.version>3.1</commons-httpclient.version>
+        <httpclient.version>4.5</httpclient.version>
 
         <!-- Utility -->
         <log4j.version>1.2.17</log4j.version>
@@ -338,6 +339,11 @@
                 <version>${commons-httpclient.version}</version>
             </dependency>
             <dependency>
+               <groupId>org.apache.httpcomponents</groupId>
+               <artifactId>httpclient</artifactId>
+               <version>${httpclient.version}</version>
+                       </dependency>
+            <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>

Reply via email to