jhsenjaliya commented on a change in pull request #3157:
URL: https://github.com/apache/incubator-gobblin/pull/3157#discussion_r534559063



##########
File path: 
gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java
##########
@@ -280,84 +314,122 @@ private static void getJhToken(Configuration conf, 
Credentials cred) throws IOEx
     }
 
     if (jhToken == null) {
-      LOG.error("getDelegationTokenFromHS() returned null");
+      log.error("getDelegationTokenFromHS() returned null");
       throw new IOException("Unable to fetch JH token.");
     }
 
-    LOG.info("Created JH token: " + jhToken.toString());
-    LOG.info("Token kind: " + jhToken.getKind());
-    LOG.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
-    LOG.info("Token service: " + jhToken.getService());
+    log.info("Created JH token: " + jhToken.toString());
+    log.info("Token kind: " + jhToken.getKind());
+    log.info("Token id: " + Arrays.toString(jhToken.getIdentifier()));
+    log.info("Token service: " + jhToken.getService());
 
     cred.addToken(jhToken.getService(), jhToken);
   }
 
-  private static void getFsAndJtTokens(final State state, final Configuration 
conf, final Optional<String> userToProxy,
-      final Credentials cred) throws IOException, InterruptedException {
+  private static void getJtTokens(final Configuration conf, final Credentials 
cred, final Optional<String> userToProxy,
+      final State state) throws IOException, InterruptedException {
 
     if (userToProxy.isPresent()) {
       UserGroupInformation.createProxyUser(userToProxy.get(), 
UserGroupInformation.getLoginUser())
           .doAs(new PrivilegedExceptionAction<Void>() {
             @Override
             public Void run() throws Exception {
-              getFsAndJtTokensImpl(state, conf, cred);
+              getJtTokensImpl(state, conf, cred);
               return null;
             }
           });
     } else {
-      getFsAndJtTokensImpl(state, conf, cred);
+      getJtTokensImpl(state, conf, cred);
     }
   }
 
-  private static void getFsAndJtTokensImpl(final State state, final 
Configuration conf, final Credentials cred)
+  private static void getJtTokensImpl(final State state, final Configuration 
conf, final Credentials cred)
       throws IOException {
-    getHdfsToken(conf, cred);
-    if (state.contains(OTHER_NAMENODES)) {
-      getOtherNamenodesToken(state.getPropAsList(OTHER_NAMENODES), conf, cred);
-    }
     getJtToken(cred);
   }
 
-  private static void getHdfsToken(Configuration conf, Credentials cred) 
throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    LOG.info("Getting DFS token from " + fs.getUri());
-    String renewer = getMRTokenRenewerInternal(new JobConf()).toString();
-    Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
-    for(int i = 0; i < fsTokens.length; i++) {
-      Token<?> token = fsTokens[i];
-      String message =
-          String.format("DFS token fetched from namenode, token kind: %s, 
token service %s", token.getKind(),
-              token.getService());
-      LOG.info(message);
+  public static void getAllFSTokens(final Configuration conf, final 
Credentials cred, final String renewer,
+                                    final Optional<String> userToProxy, final 
List<String> remoteFSURIList) throws IOException, InterruptedException {
+
+    if (userToProxy.isPresent()) {
+      UserGroupInformation.createProxyUser(userToProxy.get(), 
UserGroupInformation.getLoginUser())
+              .doAs(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+                  return null;
+                }
+              });
+    } else {
+      getAllFSTokensImpl(conf, cred, renewer, remoteFSURIList);
+    }
+  }
+
+  public static void getAllFSTokensImpl(Configuration conf, Credentials cred, 
String renewer, List<String> remoteFSURIList) {
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      if (StringUtils.isEmpty(renewer)) {
+        renewer = getMRTokenRenewerInternal(new JobConf()).toString();
+        log.info("No renewer specified for FS: {}, taking default renewer: 
{}",  fs.getUri(), renewer);
+      }
+
+      log.debug("Getting HDFS token for" + fs.getUri() + " with renewer: " + 
renewer);
+      Token<?>[] fsTokens = fs.addDelegationTokens(renewer, cred);
+      if (fsTokens != null) {
+        for (Token<?> token : fsTokens) {
+          log.info("FS Uri: " + fs.getUri() + " token: " + token);
+        }
+      }
+
+      // Handle remote namenodes if any
+      if(remoteFSURIList !=null && remoteFSURIList.size() >0){
+        getRemoteFSTokenFromURI(conf, cred, remoteFSURIList, renewer);
+      }
+
+      log.debug("All credential tokens: " + cred.getAllTokens());
+    } catch (IOException e) {
+      log.error("Error getting or creating HDFS token with renewer: "+ 
renewer);
     }
+
   }
 
-  private static void getOtherNamenodesToken(List<String> otherNamenodes, 
Configuration conf, Credentials cred)
+  public static void getRemoteFSTokenFromURI(Configuration conf, Credentials 
cred, List<String> otherNamenodes, String renewer)
       throws IOException {
-    LOG.info(OTHER_NAMENODES + ": " + otherNamenodes);
+    log.debug("Getting tokens for other namenodes: " + otherNamenodes);
     Path[] ps = new Path[otherNamenodes.size()];
     for (int i = 0; i < ps.length; i++) {
       ps[i] = new Path(otherNamenodes.get(i).trim());
+      FileSystem otherNameNodeFS = ps[i].getFileSystem(conf);
+
+      if (StringUtils.isEmpty(renewer)) {
+        TokenCache.obtainTokensForNamenodes(cred, ps, conf);
+      } else {
+        final Token<?>[] tokens = otherNameNodeFS.addDelegationTokens(renewer, 
cred);
+        if (tokens != null) {
+          for (Token<?> token : tokens) {
+            log.info("Got dt token for " + otherNameNodeFS.getUri() + "; " + 
token);
+          }
+        }
+      }
     }
-    TokenCache.obtainTokensForNamenodes(cred, ps, conf);

Review comment:
       yes, along with some refactoring that also allows getting only FS token 
vs all types of tokens. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to