[ 
https://issues.apache.org/jira/browse/GOBBLIN-1308?focusedWorklogId=535033&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-535033
 ]

ASF GitHub Bot logged work on GOBBLIN-1308:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Jan/21 20:52
            Start Date: 12/Jan/21 20:52
    Worklog Time Spent: 10m 
      Work Description: jhsenjaliya commented on a change in pull request #3157:
URL: https://github.com/apache/incubator-gobblin/pull/3157#discussion_r556081409



##########
File path: 
gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
##########
@@ -280,84 +314,122 @@ private static void getJhToken(Configuration conf, 
Credentials cred) throws IOEx
     }
 
     if (jhToken == null) {
-      LOG.error("getDelegationTokenFromHS() returned null");
+      log.error("getDelegationTokenFromHS() returned null");
       throw new IOException("Unable to fetch JH token.");
     }
 
-    LOG.info("Created JH token: " + jhToken.toString());
-    LOG.info("Token kind: " + jhToken.getKind());
-    LOG.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
-    LOG.info("Token service: " + jhToken.getService());
+    log.info("Created JH token: " + jhToken.toString());
+    log.info("Token kind: " + jhToken.getKind());
+    log.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
+    log.info("Token service: " + jhToken.getService());
 
     cred.addToken(jhToken.getService(), jhToken);
   }
 
-  private static void getFsAndJtTokens(final State state, final Configuration 
conf, final Optional<String> userToProxy,
-      final Credentials cred) throws IOException, InterruptedException {
+  private static void getJtTokens(final Configuration conf, final Credentials 
cred, final Optional<String> userToProxy,
+      final State state) throws IOException, InterruptedException {
 
     if (userToProxy.isPresent()) {
       UserGroupInformation.createProxyUser(userToProxy.get(), 
UserGroupInformation.getLoginUser())
           .doAs(new PrivilegedExceptionAction<Void>() {
             @Override
             public Void run() throws Exception {
-              getFsAndJtTokensImpl(state, conf, cred);
+              getJtTokensImpl(state, conf, cred);
               return null;
             }
           });
     } else {
-      getFsAndJtTokensImpl(state, conf, cred);
+      getJtTokensImpl(state, conf, cred);
     }
   }
 
-  private static void getFsAndJtTokensImpl(final State state, final 
Configuration conf, final Credentials cred)
+  private static void getJtTokensImpl(final State state, final Configuration 
conf, final Credentials cred)
       throws IOException {
-    getHdfsToken(conf, cred);
-    if (state.contains(OTHER_NAMENODES)) {
-      getOtherNamenodesToken(state.getPropAsList(OTHER_NAMENODES), conf, cred);
-    }
     getJtToken(cred);
   }
 
-  private static void getHdfsToken(Configuration conf, Credentials cred) 
throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    LOG.info("Getting DFS token from " + fs.getUri());
-    String renewer = getMRTokenRenewerInternal(new JobConf()).toString();
-    Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
-    for(int i = 0; i < fsTokens.length; i++) {
-      Token<?> token = fsTokens[i];
-      String message =
-          String.format("DFS token fetched from namenode, token kind: %s, 
token service %s", token.getKind(),
-              token.getService());
-      LOG.info(message);
+  public static void getAllFSTokens(final Configuration conf, final 
Credentials cred, final String renewer,
+                                    final Optional<String> userToProxy, final 
List<String> remoteFSURIList) throws IOException, InterruptedException {
+
+    if (userToProxy.isPresent()) {
+      UserGroupInformation.createProxyUser(userToProxy.get(), 
UserGroupInformation.getLoginUser())
+              .doAs(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+                  return null;
+                }
+              });
+    } else {
+      getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+    }
+  }
+
+  public static void getAllFSTokensImpl(Configuration conf, Credentials cred, 
String renewer, List<String> remoteFSURIList) {
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      if (StringUtils.isEmpty(renewer)) {
+        renewer = getMRTokenRenewerInternal(new JobConf()).toString();
+        log.info("No renewer specified for FS: {}, taking default renewer: 
{}",  fs.getUri(), renewer);
+      }
+
+      log.debug("Getting HDFS token for" + fs.getUri() + " with renewer: " + 
renewer);
+      Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
+      if (fsTokens != null) {
+        for (Token<?> token : fsTokens) {
+          log.info("FS Uri: " + fs.getUri() + " token: " + token);
+        }
+      }
+
+      // Handle remote namenodes if any
+      if(remoteFSURIList !=null && remoteFSURIList.size() >0){
+        getRemoteFSTokenFromURI(conf, cred, remoteFSURIList, renewer);
+      }
+
+      log.debug("All credential tokens: " + cred.getAllTokens());
+    } catch (IOException e) {
+      log.error("Error getting or creating HDFS token with renewer: "+ 
renewer);
     }
+
   }
 
-  private static void getOtherNamenodesToken(List<String> otherNamenodes, 
Configuration conf, Credentials cred)
+  public static void getRemoteFSTokenFromURI(Configuration conf, Credentials 
cred, List<String> otherNamenodes, String renewer)
       throws IOException {
-    LOG.info(OTHER_NAMENODES + ": " + otherNamenodes);
+    log.debug("Getting tokens for other namenodes: " + otherNamenodes);
     Path[] ps = new Path[otherNamenodes.size()];
     for (int i = 0; i < ps.length; i++) {
       ps[i] = new Path(otherNamenodes.get(i).trim());
+      FileSystem otherNameNodeFS = ps[i].getFileSystem(conf);
+
+      if (StringUtils.isEmpty(renewer)) {

Review comment:
       relooking at this logic, earlier there were no way to specify renewer, 
now that I have added way to specify renewer, we have to fallback to default 
implementation ( which is to take renewer = `getMRTokenRenewerInternal(new 
JobConf()).toString();` ) so it will fall into original implementation if user 
does not specify renewer. we can als added an overloaded method 
`getAllFSTokens` that does not take renewer at all to make it look as earlier.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 535033)
    Time Spent: 3h  (was: 2h 50m)

> Gobblin's kerberos token management for remote clusters
> -------------------------------------------------------
>
>                 Key: GOBBLIN-1308
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1308
>             Project: Apache Gobblin
>          Issue Type: Improvement
>    Affects Versions: 0.15.0
>            Reporter: Jay Sen
>            Priority: Major
>             Fix For: 0.16.0
>
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> Gobblin's hadoop tokens/ key management : 
>  Problem: Gobblin only maintains local cluster tokens when key management is 
> enabled. and does not have capability to manage tokens for remote hadoop 
> cluster. ( based on my conversation with many folks here, the token files can 
> be made available externally. but that would require that external system 
> running on cron or something )
> Solution: add remote cluster token management in Gobblin. where remote 
> clusters key can be managed same way it manages the local clusters keys.
>  
> Config looks like following
> ( Changes the enable.key.management config to key.management.enabled )
>  
> {code:java}
> gobblin.hadoop.key.management {
>  enabled = true
>  remote.clusters = [ ${gobblin_sync_systems.hadoop_cluster1}, 
> ${gobblin_sync_systems.hadoop_cluster2} ]
> }
> // These Gobblin platform configurations can be moved to database for other 
> use-cases, but this layout helps make the platform moduler for each 
> connectors.
> gobblin_sync_systems {
>  hadoop_cluster1 {
>  // if Hadoop config path is specified, the FileSystem will be created based 
> on all the xml config provided here, which has all the required info.
>  hadoop_config_path = "file:///etc/hadoop_cluster1/hadoop/config"
>  // If hadoop config path is not specified, you can still specify the 
> speecific nodes for the specific type of tokens
>  namenode_uri = ["hdfs://nn1.hadoop_cluster1.example.com:8020", 
> "hdfs://nn2.hadoop_cluster1.example.com:8020"]
>  kms_nodes = [ "kms1.hadoop_cluster1.example.com:9292", 
> "kms2.hadoop_cluster1.example.com:9292" ]
>  }
>  hadoop_cluster2 {
>  hadoop_config_path = "file:///etc/hadoop_cluster1/hadoop/config"
>  namenode_uri = ["hdfs://nn1.hadoop_cluster2.example.com:8020", 
> "hdfs://nn2.hadoop_cluster2.example.com:8020"]
>  kms_nodes = [ "kms1.hadoop_cluster2.example.com:9292", 
> "kms2.hadoop_cluster2.example.com:9292" ]
>  }
> }{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to